diff --git a/dulwich/patch.py b/dulwich/patch.py index cab8433c..4ff7caae 100644 --- a/dulwich/patch.py +++ b/dulwich/patch.py @@ -1,363 +1,363 @@ # patch.py -- For dealing with packed-style patches. # Copyright (C) 2009-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Classes for dealing with git am-style patches. These patches are basically unified diffs with some extra metadata tacked on. """ from difflib import SequenceMatcher import email.parser import time from dulwich.objects import ( Blob, Commit, S_ISGITLINK, ) FIRST_FEW_BYTES = 8000 def write_commit_patch(f, commit, contents, progress, version=None, encoding=None): """Write a individual file patch. :param commit: Commit object :param progress: Tuple with current patch number and total. :return: tuple with filename and contents """ encoding = encoding or getattr(f, "encoding", "ascii") if isinstance(contents, str): contents = contents.encode(encoding) (num, total) = progress f.write(b"From " + commit.id + b" " + time.ctime(commit.commit_time).encode(encoding) + b"\n") f.write(b"From: " + commit.author + b"\n") f.write(b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n") f.write(("Subject: [PATCH %d/%d] " % (num, total)).encode(encoding) + commit.message + b"\n") f.write(b"\n") f.write(b"---\n") try: import subprocess p = subprocess.Popen(["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) except (ImportError, OSError): pass # diffstat not available? else: (diffstat, _) = p.communicate(contents) f.write(diffstat) f.write(b"\n") f.write(contents) f.write(b"-- \n") if version is None: from dulwich import __version__ as dulwich_version f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) else: f.write(version.encode(encoding) + b"\n") def get_summary(commit): """Determine the summary line for use in a filename. :param commit: Commit :return: Summary string """ return commit.message.splitlines()[0].replace(" ", "-") # Unified Diff def _format_range_unified(start, stop): 'Convert range to the "ed" format' # Per the diff spec at http://www.unix.org/single_unix_specification/ beginning = start + 1 # lines start numbering with one length = stop - start if length == 1: return '{}'.format(beginning) if not length: beginning -= 1 # empty ranges begin at line just before the range return '{},{}'.format(beginning, length) def unified_diff(a, b, fromfile='', tofile='', fromfiledate='', tofiledate='', n=3, lineterm='\n'): """difflib.unified_diff that can detect "No newline at end of file" as original "git diff" does. Based on the same function in Python2.7 difflib.py """ started = False for group in SequenceMatcher(None, a, b).get_grouped_opcodes(n): if not started: started = True fromdate = '\t{}'.format(fromfiledate) if fromfiledate else '' todate = '\t{}'.format(tofiledate) if tofiledate else '' yield '--- {}{}{}'.format( fromfile.decode("ascii"), fromdate, lineterm ).encode('ascii') yield '+++ {}{}{}'.format( tofile.decode("ascii"), todate, lineterm ).encode('ascii') first, last = group[0], group[-1] file1_range = _format_range_unified(first[1], last[2]) file2_range = _format_range_unified(first[3], last[4]) yield '@@ -{} +{} @@{}'.format( file1_range, file2_range, lineterm ).encode('ascii') for tag, i1, i2, j1, j2 in group: if tag == 'equal': for line in a[i1:i2]: yield b' ' + line continue if tag in ('replace', 'delete'): for line in a[i1:i2]: if not line[-1:] == b'\n': line += b'\n\\ No newline at end of file\n' yield b'-' + line if tag in ('replace', 'insert'): for line in b[j1:j2]: if not line[-1:] == b'\n': line += b'\n\\ No newline at end of file\n' yield b'+' + line def is_binary(content): """See if the first few bytes contain any null characters. :param content: Bytestring to check for binary content """ return b'\0' in content[:FIRST_FEW_BYTES] def shortid(hexsha): if hexsha is None: return b"0" * 7 else: return hexsha[:7] def patch_filename(p, root): if p is None: return b"/dev/null" else: return root + b"/" + p def write_object_diff(f, store, old_file, new_file, diff_binary=False): """Write the diff for an object. :param f: File-like object to write to :param store: Store to retrieve objects from, if necessary :param old_file: (path, mode, hexsha) tuple :param new_file: (path, mode, hexsha) tuple :param diff_binary: Whether to diff files even if they are considered binary files by is_binary(). :note: the tuple elements should be None for nonexistant files """ (old_path, old_mode, old_id) = old_file (new_path, new_mode, new_id) = new_file patched_old_path = patch_filename(old_path, b"a") patched_new_path = patch_filename(new_path, b"b") def content(mode, hexsha): if hexsha is None: return Blob.from_string(b'') elif S_ISGITLINK(mode): return Blob.from_string(b"Submodule commit " + hexsha + b"\n") else: return store[hexsha] def lines(content): if not content: return [] else: return content.splitlines() f.writelines(gen_diff_header( (old_path, new_path), (old_mode, new_mode), (old_id, new_id))) old_content = content(old_mode, old_id) new_content = content(new_mode, new_id) if not diff_binary and ( is_binary(old_content.data) or is_binary(new_content.data)): binary_diff = ( b"Binary files " + patched_old_path + b" and " + patched_new_path + b" differ\n" ) f.write(binary_diff) else: f.writelines(unified_diff(lines(old_content), lines(new_content), patched_old_path, patched_new_path)) # TODO(jelmer): Support writing unicode, rather than bytes. def gen_diff_header(paths, modes, shas): """Write a blob diff header. :param paths: Tuple with old and new path :param modes: Tuple with old and new modes :param shas: Tuple with old and new shas """ (old_path, new_path) = paths (old_mode, new_mode) = modes (old_sha, new_sha) = shas if old_path is None and new_path is not None: old_path = new_path if new_path is None and old_path is not None: new_path = old_path old_path = patch_filename(old_path, b"a") new_path = patch_filename(new_path, b"b") yield b"diff --git " + old_path + b" " + new_path + b"\n" if old_mode != new_mode: if new_mode is not None: if old_mode is not None: yield ("old file mode %o\n" % old_mode).encode('ascii') yield ("new file mode %o\n" % new_mode).encode('ascii') else: yield ("deleted file mode %o\n" % old_mode).encode('ascii') yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) - if new_mode is not None: + if new_mode is not None and old_mode is not None: yield (" %o" % new_mode).encode('ascii') yield b"\n" # TODO(jelmer): Support writing unicode, rather than bytes. def write_blob_diff(f, old_file, new_file): """Write blob diff. :param f: File-like object to write to :param old_file: (path, mode, hexsha) tuple (None if nonexisting) :param new_file: (path, mode, hexsha) tuple (None if nonexisting) :note: The use of write_object_diff is recommended over this function. """ (old_path, old_mode, old_blob) = old_file (new_path, new_mode, new_blob) = new_file patched_old_path = patch_filename(old_path, b"a") patched_new_path = patch_filename(new_path, b"b") def lines(blob): if blob is not None: return blob.splitlines() else: return [] f.writelines(gen_diff_header( (old_path, new_path), (old_mode, new_mode), (getattr(old_blob, "id", None), getattr(new_blob, "id", None)))) old_contents = lines(old_blob) new_contents = lines(new_blob) f.writelines(unified_diff(old_contents, new_contents, patched_old_path, patched_new_path)) def write_tree_diff(f, store, old_tree, new_tree, diff_binary=False): """Write tree diff. :param f: File-like object to write to. :param old_tree: Old tree id :param new_tree: New tree id :param diff_binary: Whether to diff files even if they are considered binary files by is_binary(). """ changes = store.tree_changes(old_tree, new_tree) for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: write_object_diff(f, store, (oldpath, oldmode, oldsha), (newpath, newmode, newsha), diff_binary=diff_binary) def git_am_patch_split(f, encoding=None): """Parse a git-am-style patch and split it up into bits. :param f: File-like object to parse :param encoding: Encoding to use when creating Git objects :return: Tuple with commit object, diff contents and git version """ encoding = encoding or getattr(f, "encoding", "ascii") encoding = encoding or "ascii" contents = f.read() if (isinstance(contents, bytes) and getattr(email.parser, "BytesParser", None)): parser = email.parser.BytesParser() msg = parser.parsebytes(contents) else: parser = email.parser.Parser() msg = parser.parsestr(contents) return parse_patch_message(msg, encoding) def parse_patch_message(msg, encoding=None): """Extract a Commit object and patch from an e-mail message. :param msg: An email message (email.message.Message) :param encoding: Encoding to use to encode Git commits :return: Tuple with commit object, diff contents and git version """ c = Commit() c.author = msg["from"].encode(encoding) c.committer = msg["from"].encode(encoding) try: patch_tag_start = msg["subject"].index("[PATCH") except ValueError: subject = msg["subject"] else: close = msg["subject"].index("] ", patch_tag_start) subject = msg["subject"][close+2:] c.message = (subject.replace("\n", "") + "\n").encode(encoding) first = True body = msg.get_payload(decode=True) lines = body.splitlines(True) line_iter = iter(lines) for line in line_iter: if line == b"---\n": break if first: if line.startswith(b"From: "): c.author = line[len(b"From: "):].rstrip() else: c.message += b"\n" + line first = False else: c.message += line diff = b"" for line in line_iter: if line == b"-- \n": break diff += line try: version = next(line_iter).rstrip(b"\n") except StopIteration: version = None return c, diff, version diff --git a/dulwich/tests/test_patch.py b/dulwich/tests/test_patch.py index 43f1f864..942b3bff 100644 --- a/dulwich/tests/test_patch.py +++ b/dulwich/tests/test_patch.py @@ -1,539 +1,539 @@ # test_patch.py -- tests for patch.py # Copyright (C) 2010 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for patch.py.""" from io import BytesIO, StringIO from dulwich.objects import ( Blob, Commit, S_IFGITLINK, Tree, ) from dulwich.object_store import ( MemoryObjectStore, ) from dulwich.patch import ( git_am_patch_split, write_blob_diff, write_commit_patch, write_object_diff, write_tree_diff, ) from dulwich.tests import ( SkipTest, TestCase, ) class WriteCommitPatchTests(TestCase): def test_simple_bytesio(self): f = BytesIO() c = Commit() c.committer = c.author = b"Jelmer " c.commit_time = c.author_time = 1271350201 c.commit_timezone = c.author_timezone = 0 c.message = b"This is the first line\nAnd this is the second line.\n" c.tree = Tree().id write_commit_patch(f, c, b"CONTENTS", (1, 1), version="custom") f.seek(0) lines = f.readlines() self.assertTrue(lines[0].startswith( b"From 0b0d34d1b5b596c928adc9a727a4b9e03d025298")) self.assertEqual(lines[1], b"From: Jelmer \n") self.assertTrue(lines[2].startswith(b"Date: ")) self.assertEqual([ b"Subject: [PATCH 1/1] This is the first line\n", b"And this is the second line.\n", b"\n", b"\n", b"---\n"], lines[3:8]) self.assertEqual([ b"CONTENTS-- \n", b"custom\n"], lines[-2:]) if len(lines) >= 12: # diffstat may not be present self.assertEqual(lines[8], b" 0 files changed\n") class ReadGitAmPatch(TestCase): def test_extract_string(self): text = b"""\ From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a warning). --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split( StringIO(text.decode("utf-8")), "utf-8") self.assertEqual(b"Jelmer Vernooij ", c.committer) self.assertEqual(b"Jelmer Vernooij ", c.author) self.assertEqual(b"Remove executable bit from prey.ico " b"(triggers a warning).\n", c.message) self.assertEqual(b""" pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico """, diff) self.assertEqual(b"1.7.0.4", version) def test_extract_bytes(self): text = b"""\ From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a warning). --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text)) self.assertEqual(b"Jelmer Vernooij ", c.committer) self.assertEqual(b"Jelmer Vernooij ", c.author) self.assertEqual(b"Remove executable bit from prey.ico " b"(triggers a warning).\n", c.message) self.assertEqual(b""" pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico """, diff) self.assertEqual(b"1.7.0.4", version) def test_extract_spaces(self): text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [Dulwich-users] [PATCH] Added unit tests for dulwich.object_store.tree_lookup_path. * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text), "utf-8") self.assertEqual(b'''\ Added unit tests for dulwich.object_store.tree_lookup_path. * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. ''', c.message) def test_extract_pseudo_from_header(self): text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [Dulwich-users] [PATCH] Added unit tests for dulwich.object_store.tree_lookup_path. From: Jelmer Vernooij * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text), "utf-8") self.assertEqual(b"Jelmer Vernooij ", c.author) self.assertEqual(b'''\ Added unit tests for dulwich.object_store.tree_lookup_path. * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. ''', c.message) def test_extract_no_version_tail(self): text = b"""\ From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [Dulwich-users] [PATCH] Added unit tests for dulwich.object_store.tree_lookup_path. From: Jelmer Vernooij --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico """ c, diff, version = git_am_patch_split(BytesIO(text), "utf-8") self.assertEqual(None, version) def test_extract_mercurial(self): raise SkipTest( "git_am_patch_split doesn't handle Mercurial patches " "properly yet") expected_diff = """\ diff --git a/dulwich/tests/test_patch.py b/dulwich/tests/test_patch.py --- a/dulwich/tests/test_patch.py +++ b/dulwich/tests/test_patch.py @@ -158,7 +158,7 @@ ''' c, diff, version = git_am_patch_split(BytesIO(text)) - self.assertIs(None, version) + self.assertEqual(None, version) class DiffTests(TestCase): """ # noqa: W291,W293 text = """\ From dulwich-users-bounces+jelmer=samba.org@lists.launchpad.net \ Mon Nov 29 00:58:18 2010 Date: Sun, 28 Nov 2010 17:57:27 -0600 From: Augie Fackler To: dulwich-users Subject: [Dulwich-users] [PATCH] test_patch: fix tests on Python 2.6 Content-Transfer-Encoding: 8bit Change-Id: I5e51313d4ae3a65c3f00c665002a7489121bb0d6 %s _______________________________________________ Mailing list: https://launchpad.net/~dulwich-users Post to : dulwich-users@lists.launchpad.net Unsubscribe : https://launchpad.net/~dulwich-users More help : https://help.launchpad.net/ListHelp """ % expected_diff # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text)) self.assertEqual(expected_diff, diff) self.assertEqual(None, version) class DiffTests(TestCase): """Tests for write_blob_diff and write_tree_diff.""" def test_blob_diff(self): f = BytesIO() write_blob_diff( f, (b"foo.txt", 0o644, Blob.from_string(b"old\nsame\n")), (b"bar.txt", 0o644, Blob.from_string(b"new\nsame\n"))) self.assertEqual([ b"diff --git a/foo.txt b/bar.txt", b"index 3b0f961..a116b51 644", b"--- a/foo.txt", b"+++ b/bar.txt", b"@@ -1,2 +1,2 @@", b"-old", b"+new", b" same" ], f.getvalue().splitlines()) def test_blob_add(self): f = BytesIO() write_blob_diff( f, (None, None, None), (b"bar.txt", 0o644, Blob.from_string(b"new\nsame\n"))) self.assertEqual([ b'diff --git a/bar.txt b/bar.txt', b'new file mode 644', - b'index 0000000..a116b51 644', + b'index 0000000..a116b51', b'--- /dev/null', b'+++ b/bar.txt', b'@@ -0,0 +1,2 @@', b'+new', b'+same' ], f.getvalue().splitlines()) def test_blob_remove(self): f = BytesIO() write_blob_diff( f, (b"bar.txt", 0o644, Blob.from_string(b"new\nsame\n")), (None, None, None)) self.assertEqual([ b'diff --git a/bar.txt b/bar.txt', b'deleted file mode 644', b'index a116b51..0000000', b'--- a/bar.txt', b'+++ /dev/null', b'@@ -1,2 +0,0 @@', b'-new', b'-same' ], f.getvalue().splitlines()) def test_tree_diff(self): f = BytesIO() store = MemoryObjectStore() added = Blob.from_string(b"add\n") removed = Blob.from_string(b"removed\n") changed1 = Blob.from_string(b"unchanged\nremoved\n") changed2 = Blob.from_string(b"unchanged\nadded\n") unchanged = Blob.from_string(b"unchanged\n") tree1 = Tree() tree1.add(b"removed.txt", 0o644, removed.id) tree1.add(b"changed.txt", 0o644, changed1.id) tree1.add(b"unchanged.txt", 0o644, changed1.id) tree2 = Tree() tree2.add(b"added.txt", 0o644, added.id) tree2.add(b"changed.txt", 0o644, changed2.id) tree2.add(b"unchanged.txt", 0o644, changed1.id) store.add_objects([(o, None) for o in [ tree1, tree2, added, removed, changed1, changed2, unchanged]]) write_tree_diff(f, store, tree1.id, tree2.id) self.assertEqual([ b'diff --git a/added.txt b/added.txt', b'new file mode 644', - b'index 0000000..76d4bb8 644', + b'index 0000000..76d4bb8', b'--- /dev/null', b'+++ b/added.txt', b'@@ -0,0 +1 @@', b'+add', b'diff --git a/changed.txt b/changed.txt', b'index bf84e48..1be2436 644', b'--- a/changed.txt', b'+++ b/changed.txt', b'@@ -1,2 +1,2 @@', b' unchanged', b'-removed', b'+added', b'diff --git a/removed.txt b/removed.txt', b'deleted file mode 644', b'index 2c3f0b3..0000000', b'--- a/removed.txt', b'+++ /dev/null', b'@@ -1 +0,0 @@', b'-removed', ], f.getvalue().splitlines()) def test_tree_diff_submodule(self): f = BytesIO() store = MemoryObjectStore() tree1 = Tree() tree1.add(b"asubmodule", S_IFGITLINK, b"06d0bdd9e2e20377b3180e4986b14c8549b393e4") tree2 = Tree() tree2.add(b"asubmodule", S_IFGITLINK, b"cc975646af69f279396d4d5e1379ac6af80ee637") store.add_objects([(o, None) for o in [tree1, tree2]]) write_tree_diff(f, store, tree1.id, tree2.id) self.assertEqual([ b'diff --git a/asubmodule b/asubmodule', b'index 06d0bdd..cc97564 160000', b'--- a/asubmodule', b'+++ b/asubmodule', b'@@ -1 +1 @@', b'-Submodule commit 06d0bdd9e2e20377b3180e4986b14c8549b393e4', b'+Submodule commit cc975646af69f279396d4d5e1379ac6af80ee637', ], f.getvalue().splitlines()) def test_object_diff_blob(self): f = BytesIO() b1 = Blob.from_string(b"old\nsame\n") b2 = Blob.from_string(b"new\nsame\n") store = MemoryObjectStore() store.add_objects([(b1, None), (b2, None)]) write_object_diff(f, store, (b"foo.txt", 0o644, b1.id), (b"bar.txt", 0o644, b2.id)) self.assertEqual([ b"diff --git a/foo.txt b/bar.txt", b"index 3b0f961..a116b51 644", b"--- a/foo.txt", b"+++ b/bar.txt", b"@@ -1,2 +1,2 @@", b"-old", b"+new", b" same" ], f.getvalue().splitlines()) def test_object_diff_add_blob(self): f = BytesIO() store = MemoryObjectStore() b2 = Blob.from_string(b"new\nsame\n") store.add_object(b2) write_object_diff(f, store, (None, None, None), (b"bar.txt", 0o644, b2.id)) self.assertEqual([ b'diff --git a/bar.txt b/bar.txt', b'new file mode 644', - b'index 0000000..a116b51 644', + b'index 0000000..a116b51', b'--- /dev/null', b'+++ b/bar.txt', b'@@ -0,0 +1,2 @@', b'+new', b'+same' ], f.getvalue().splitlines()) def test_object_diff_remove_blob(self): f = BytesIO() b1 = Blob.from_string(b"new\nsame\n") store = MemoryObjectStore() store.add_object(b1) write_object_diff(f, store, (b"bar.txt", 0o644, b1.id), (None, None, None)) self.assertEqual([ b'diff --git a/bar.txt b/bar.txt', b'deleted file mode 644', b'index a116b51..0000000', b'--- a/bar.txt', b'+++ /dev/null', b'@@ -1,2 +0,0 @@', b'-new', b'-same' ], f.getvalue().splitlines()) def test_object_diff_bin_blob_force(self): f = BytesIO() # Prepare two slightly different PNG headers b1 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x04\x00\x00\x00\x05\x04\x8b") b2 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x03\x00\x00\x00\x98\xd3\xb3") store = MemoryObjectStore() store.add_objects([(b1, None), (b2, None)]) write_object_diff( f, store, (b'foo.png', 0o644, b1.id), (b'bar.png', 0o644, b2.id), diff_binary=True) self.assertEqual([ b'diff --git a/foo.png b/bar.png', b'index f73e47d..06364b7 644', b'--- a/foo.png', b'+++ b/bar.png', b'@@ -1,4 +1,4 @@', b' \x89PNG', b' \x1a', b' \x00\x00\x00', b'-IHDR\x00\x00\x01\xd5\x00\x00\x00' b'\x9f\x08\x04\x00\x00\x00\x05\x04\x8b', b'\\ No newline at end of file', b'+IHDR\x00\x00\x01\xd5\x00\x00\x00\x9f' b'\x08\x03\x00\x00\x00\x98\xd3\xb3', b'\\ No newline at end of file' ], f.getvalue().splitlines()) def test_object_diff_bin_blob(self): f = BytesIO() # Prepare two slightly different PNG headers b1 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x04\x00\x00\x00\x05\x04\x8b") b2 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x03\x00\x00\x00\x98\xd3\xb3") store = MemoryObjectStore() store.add_objects([(b1, None), (b2, None)]) write_object_diff(f, store, (b'foo.png', 0o644, b1.id), (b'bar.png', 0o644, b2.id)) self.assertEqual([ b'diff --git a/foo.png b/bar.png', b'index f73e47d..06364b7 644', b'Binary files a/foo.png and b/bar.png differ' ], f.getvalue().splitlines()) def test_object_diff_add_bin_blob(self): f = BytesIO() b2 = Blob.from_string( b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a' b'\x00\x00\x00\x0d\x49\x48\x44\x52' b'\x00\x00\x01\xd5\x00\x00\x00\x9f' b'\x08\x03\x00\x00\x00\x98\xd3\xb3') store = MemoryObjectStore() store.add_object(b2) write_object_diff(f, store, (None, None, None), (b'bar.png', 0o644, b2.id)) self.assertEqual([ b'diff --git a/bar.png b/bar.png', b'new file mode 644', - b'index 0000000..06364b7 644', + b'index 0000000..06364b7', b'Binary files /dev/null and b/bar.png differ' ], f.getvalue().splitlines()) def test_object_diff_remove_bin_blob(self): f = BytesIO() b1 = Blob.from_string( b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a' b'\x00\x00\x00\x0d\x49\x48\x44\x52' b'\x00\x00\x01\xd5\x00\x00\x00\x9f' b'\x08\x04\x00\x00\x00\x05\x04\x8b') store = MemoryObjectStore() store.add_object(b1) write_object_diff(f, store, (b'foo.png', 0o644, b1.id), (None, None, None)) self.assertEqual([ b'diff --git a/foo.png b/foo.png', b'deleted file mode 644', b'index f73e47d..0000000', b'Binary files a/foo.png and /dev/null differ' ], f.getvalue().splitlines()) def test_object_diff_kind_change(self): f = BytesIO() b1 = Blob.from_string(b"new\nsame\n") store = MemoryObjectStore() store.add_object(b1) write_object_diff( f, store, (b"bar.txt", 0o644, b1.id), (b"bar.txt", 0o160000, b"06d0bdd9e2e20377b3180e4986b14c8549b393e4")) self.assertEqual([ b'diff --git a/bar.txt b/bar.txt', b'old file mode 644', b'new file mode 160000', b'index a116b51..06d0bdd 160000', b'--- a/bar.txt', b'+++ b/bar.txt', b'@@ -1,2 +1 @@', b'-new', b'-same', b'+Submodule commit 06d0bdd9e2e20377b3180e4986b14c8549b393e4', ], f.getvalue().splitlines()) diff --git a/dulwich/tests/test_porcelain.py b/dulwich/tests/test_porcelain.py index 89d41a27..ebef29a8 100644 --- a/dulwich/tests/test_porcelain.py +++ b/dulwich/tests/test_porcelain.py @@ -1,1875 +1,1875 @@ # test_porcelain.py -- porcelain tests # Copyright (C) 2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for dulwich.porcelain.""" from io import BytesIO try: from StringIO import StringIO except ImportError: from io import StringIO import errno import os import shutil import tarfile import tempfile import time from dulwich import porcelain from dulwich.diff_tree import tree_changes from dulwich.objects import ( Blob, Tag, Tree, ZERO_SHA, ) from dulwich.repo import ( NoIndexPresent, Repo, ) from dulwich.tests import ( TestCase, ) from dulwich.tests.utils import ( build_commit_graph, make_commit, make_object, ) from dulwich.tests.compat.utils import ( run_git_or_fail, ) def flat_walk_dir(dir_to_walk): for dirpath, _, filenames in os.walk(dir_to_walk): rel_dirpath = os.path.relpath(dirpath, dir_to_walk) if not dirpath == dir_to_walk: yield rel_dirpath for filename in filenames: if dirpath == dir_to_walk: yield filename else: yield os.path.join(rel_dirpath, filename) class PorcelainTestCase(TestCase): def setUp(self): super(PorcelainTestCase, self).setUp() self.test_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.test_dir) self.repo_path = os.path.join(self.test_dir, 'repo') self.repo = Repo.init(self.repo_path, mkdir=True) self.addCleanup(self.repo.close) class ArchiveTests(PorcelainTestCase): """Tests for the archive command.""" def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/master"] = c3.id out = BytesIO() err = BytesIO() porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out, errstream=err) self.assertEqual(b"", err.getvalue()) tf = tarfile.TarFile(fileobj=out) self.addCleanup(tf.close) self.assertEqual([], tf.getnames()) class UpdateServerInfoTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id porcelain.update_server_info(self.repo.path) self.assertTrue(os.path.exists( os.path.join(self.repo.controldir(), 'info', 'refs'))) class CommitTests(PorcelainTestCase): def test_custom_author(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) def test_unicode(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit( self.repo.path, message="Some message", author="Joe ", committer="Bob ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class CleanTests(PorcelainTestCase): def put_files(self, tracked, ignored, untracked, empty_dirs): """Put the described files in the wd """ all_files = tracked | ignored | untracked for file_path in all_files: abs_path = os.path.join(self.repo.path, file_path) # File may need to be written in a dir that doesn't exist yet, so # create the parent dir(s) as necessary parent_dir = os.path.dirname(abs_path) try: os.makedirs(parent_dir) except OSError as err: if not err.errno == errno.EEXIST: raise err with open(abs_path, 'w') as f: f.write('') with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.writelines(ignored) for dir_path in empty_dirs: os.mkdir(os.path.join(self.repo.path, 'empty_dir')) files_to_add = [os.path.join(self.repo.path, t) for t in tracked] porcelain.add(repo=self.repo.path, paths=files_to_add) porcelain.commit(repo=self.repo.path, message="init commit") def assert_wd(self, expected_paths): """Assert paths of files and dirs in wd are same as expected_paths """ control_dir_rel = os.path.relpath( self.repo._controldir, self.repo.path) # normalize paths to simplify comparison across platforms found_paths = { os.path.normpath(p) for p in flat_walk_dir(self.repo.path) if not p.split(os.sep)[0] == control_dir_rel} norm_expected_paths = {os.path.normpath(p) for p in expected_paths} self.assertEqual(found_paths, norm_expected_paths) def test_from_root(self): self.put_files( tracked={ 'tracked_file', 'tracked_dir/tracked_file', '.gitignore'}, ignored={ 'ignored_file'}, untracked={ 'untracked_file', 'tracked_dir/untracked_dir/untracked_file', 'untracked_dir/untracked_dir/untracked_file'}, empty_dirs={ 'empty_dir'}) porcelain.clean(repo=self.repo.path, target_dir=self.repo.path) self.assert_wd({ 'tracked_file', 'tracked_dir/tracked_file', '.gitignore', 'ignored_file', 'tracked_dir'}) def test_from_subdir(self): self.put_files( tracked={ 'tracked_file', 'tracked_dir/tracked_file', '.gitignore'}, ignored={ 'ignored_file'}, untracked={ 'untracked_file', 'tracked_dir/untracked_dir/untracked_file', 'untracked_dir/untracked_dir/untracked_file'}, empty_dirs={ 'empty_dir'}) porcelain.clean( repo=self.repo, target_dir=os.path.join(self.repo.path, 'untracked_dir')) self.assert_wd({ 'tracked_file', 'tracked_dir/tracked_file', '.gitignore', 'ignored_file', 'untracked_file', 'tracked_dir/untracked_dir/untracked_file', 'empty_dir', 'untracked_dir', 'tracked_dir', 'tracked_dir/untracked_dir'}) class CloneTests(PorcelainTestCase): def test_simple_local(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id self.repo.refs[b"refs/tags/foo"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone(self.repo.path, target_path, checkout=False, errstream=errstream) self.addCleanup(r.close) self.assertEqual(r.path, target_path) target_repo = Repo(target_path) self.assertEqual(0, len(target_repo.open_index())) self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo']) self.assertTrue(b'f1' not in os.listdir(target_path)) self.assertTrue(b'f2' not in os.listdir(target_path)) c = r.get_config() encoded_path = self.repo.path if not isinstance(encoded_path, bytes): encoded_path = encoded_path.encode('utf-8') self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url')) self.assertEqual( b'+refs/heads/*:refs/remotes/origin/*', c.get((b'remote', b'origin'), b'fetch')) def test_simple_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with porcelain.clone(self.repo.path, target_path, checkout=True, errstream=errstream) as r: self.assertEqual(r.path, target_path) with Repo(target_path) as r: self.assertEqual(r.head(), c3.id) self.assertTrue('f1' in os.listdir(target_path)) self.assertTrue('f2' in os.listdir(target_path)) def test_bare_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with porcelain.clone( self.repo.path, target_path, bare=True, errstream=errstream) as r: self.assertEqual(r.path, target_path) with Repo(target_path) as r: r.head() self.assertRaises(NoIndexPresent, r.open_index) self.assertFalse(b'f1' in os.listdir(target_path)) self.assertFalse(b'f2' in os.listdir(target_path)) def test_no_checkout_with_bare(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id self.repo.refs[b"HEAD"] = c1.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) self.assertRaises( ValueError, porcelain.clone, self.repo.path, target_path, checkout=True, bare=True, errstream=errstream) def test_no_head_no_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) errstream = BytesIO() r = porcelain.clone( self.repo.path, target_path, checkout=True, errstream=errstream) r.close() def test_no_head_no_checkout_outstream_errstream_autofallback(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) errstream = porcelain.NoneStream() r = porcelain.clone( self.repo.path, target_path, checkout=True, errstream=errstream) r.close() class InitTests(TestCase): def test_non_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir) def test_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir, bare=True) class AddTests(PorcelainTestCase): def test_add_default_paths(self): # create a file for initial commit fullpath = os.path.join(self.repo.path, 'blah') with open(fullpath, 'w') as f: f.write("\n") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test ', committer=b'test ') # Add a second test file and a file in a directory with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("\n") os.mkdir(os.path.join(self.repo.path, 'adir')) with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f: f.write("\n") cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.add(self.repo.path) finally: os.chdir(cwd) # Check that foo was added and nothing in .git was modified index = self.repo.open_index() self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo']) def test_add_default_paths_subdir(self): os.mkdir(os.path.join(self.repo.path, 'foo')) with open(os.path.join(self.repo.path, 'blah'), 'w') as f: f.write("\n") with open(os.path.join(self.repo.path, 'foo', 'blie'), 'w') as f: f.write("\n") cwd = os.getcwd() try: os.chdir(os.path.join(self.repo.path, 'foo')) porcelain.add(repo=self.repo.path) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test ', committer=b'test ') finally: os.chdir(cwd) index = self.repo.open_index() self.assertEqual(sorted(index), [b'foo/blie']) def test_add_file(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) self.assertIn(b"foo", self.repo.open_index()) def test_add_ignored(self): with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write("foo") with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") with open(os.path.join(self.repo.path, 'bar'), 'w') as f: f.write("BAR") (added, ignored) = porcelain.add(self.repo.path, paths=[ os.path.join(self.repo.path, "foo"), os.path.join(self.repo.path, "bar")]) self.assertIn(b"bar", self.repo.open_index()) self.assertEqual(set(['bar']), set(added)) self.assertEqual(set(['foo']), ignored) def test_add_file_absolute_path(self): # Absolute paths are (not yet) supported with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo, paths=[os.path.join(self.repo.path, "foo")]) self.assertIn(b"foo", self.repo.open_index()) def test_add_not_in_repo(self): with open(os.path.join(self.test_dir, 'foo'), 'w') as f: f.write("BAR") self.assertRaises( ValueError, porcelain.add, self.repo, paths=[os.path.join(self.test_dir, "foo")]) self.assertRaises( ValueError, porcelain.add, self.repo, paths=["../foo"]) self.assertEqual([], list(self.repo.open_index())) def test_add_file_clrf_conversion(self): # Set the right configuration to the repo c = self.repo.get_config() c.set("core", "autocrlf", "input") c.write_to_path() # Add a file with CRLF line-ending fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'wb') as f: f.write(b"line1\r\nline2") porcelain.add(self.repo.path, paths=[fullpath]) # The line-endings should have been converted to LF index = self.repo.open_index() self.assertIn(b"foo", index) entry = index[b"foo"] blob = self.repo[entry.sha] self.assertEqual(blob.data, b"line1\nline2") class RemoveTests(PorcelainTestCase): def test_remove_file(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo, message=b'test', author=b'test ', committer=b'test ') self.assertTrue(os.path.exists(os.path.join(self.repo.path, 'foo'))) cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.remove(self.repo.path, paths=["foo"]) finally: os.chdir(cwd) self.assertFalse(os.path.exists(os.path.join(self.repo.path, 'foo'))) def test_remove_file_staged(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.add(self.repo.path, paths=[fullpath]) self.assertRaises(Exception, porcelain.rm, self.repo.path, paths=["foo"]) finally: os.chdir(cwd) class LogTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream) self.assertEqual(3, outstream.getvalue().count("-" * 50)) def test_max_entries(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream, max_entries=1) self.assertEqual(1, outstream.getvalue().count("-" * 50)) class ShowTests(PorcelainTestCase): def test_nolist(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=c3.id, outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_blob(self): b = Blob.from_string(b"The Foo\n") self.repo.object_store.add_object(b) outstream = StringIO() porcelain.show(self.repo.path, objects=[b.id], outstream=outstream) self.assertEqual(outstream.getvalue(), "The Foo\n") def test_commit_no_parent(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) self.repo.object_store.add_objects([(a, None), (ta, None), (ca, None)]) outstream = StringIO() porcelain.show(self.repo.path, objects=[ca.id], outstream=outstream) self.assertMultiLineEqual(outstream.getvalue(), """\ -------------------------------------------------- commit: 344da06c1bb85901270b3e8875c988a027ec087d Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename new file mode 100644 -index 0000000..ea5c7bf 100644 +index 0000000..ea5c7bf --- /dev/null +++ b/somename @@ -0,0 +1 @@ +The Foo """) def test_tag(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) self.repo.object_store.add_objects([(a, None), (ta, None), (ca, None)]) porcelain.tag_create( self.repo.path, b"tryme", b'foo ', b'bar', annotated=True, objectish=ca.id, tag_time=1552854211, tag_timezone=0) outstream = StringIO() porcelain.show(self.repo, objects=[b'refs/tags/tryme'], outstream=outstream) self.maxDiff = None self.assertMultiLineEqual(outstream.getvalue(), """\ Tagger: foo Date: Sun Mar 17 2019 20:23:31 +0000 bar -------------------------------------------------- commit: 344da06c1bb85901270b3e8875c988a027ec087d Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename new file mode 100644 -index 0000000..ea5c7bf 100644 +index 0000000..ea5c7bf --- /dev/null +++ b/somename @@ -0,0 +1 @@ +The Foo """) def test_commit_with_change(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) b = Blob.from_string(b"The Bar\n") tb = Tree() tb.add(b"somename", 0o100644, b.id) cb = make_commit(tree=tb.id, parents=[ca.id]) self.repo.object_store.add_objects( [(a, None), (b, None), (ta, None), (tb, None), (ca, None), (cb, None)]) outstream = StringIO() porcelain.show(self.repo.path, objects=[cb.id], outstream=outstream) self.assertMultiLineEqual(outstream.getvalue(), """\ -------------------------------------------------- commit: 2c6b6c9cb72c130956657e1fdae58e5b103744fa Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename index ea5c7bf..fd38bcb 100644 --- a/somename +++ b/somename @@ -1 +1 @@ -The Foo +The Bar """) class SymbolicRefTests(PorcelainTestCase): def test_set_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar') def test_set_force_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True) # test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n') def test_set_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'master') def test_set_symbolic_ref_other_than_master(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]], attrs=dict(refs='develop')) self.repo.refs[b"HEAD"] = c3.id self.repo.refs[b"refs/heads/develop"] = c3.id porcelain.symbolic_ref(self.repo.path, b'develop') # test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/develop\n') class DiffTreeTests(PorcelainTestCase): def test_empty(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream) self.assertEqual(outstream.getvalue(), b"") def test_diff_apply(self): # Prepare the repository # Create some files and commit them file_list = ["to_exists", "to_modify", "to_delete"] for file in file_list: file_path = os.path.join(self.repo_path, file) # Touch the files with open(file_path, "w"): pass self.repo.stage(file_list) first_commit = self.repo.do_commit(b"The first commit") # Make a copy of the repository so we can apply the diff later copy_path = os.path.join(self.test_dir, "copy") shutil.copytree(self.repo_path, copy_path) # Do some changes with open(os.path.join(self.repo_path, "to_modify"), "w") as f: f.write("Modified!") os.remove(os.path.join(self.repo_path, "to_delete")) with open(os.path.join(self.repo_path, "to_add"), "w"): pass self.repo.stage(["to_modify", "to_delete", "to_add"]) second_commit = self.repo.do_commit(b"The second commit") # Get the patch first_tree = self.repo[first_commit].tree second_tree = self.repo[second_commit].tree outstream = BytesIO() porcelain.diff_tree(self.repo.path, first_tree, second_tree, outstream=outstream) # Save it on disk patch_path = os.path.join(self.test_dir, "patch.patch") with open(patch_path, "wb") as patch: patch.write(outstream.getvalue()) # And try to apply it to the copy directory git_command = ["-C", copy_path, "apply", patch_path] run_git_or_fail(git_command) # And now check that the files contents are exactly the same between # the two repositories original_files = set(os.listdir(self.repo_path)) new_files = set(os.listdir(copy_path)) # Check that we have the exact same files in both repositories assert original_files == new_files for file in original_files: if file == ".git": continue original_file_path = os.path.join(self.repo_path, file) copy_file_path = os.path.join(copy_path, file) assert os.path.isfile(copy_file_path) with open(original_file_path, "rb") as original_file: original_content = original_file.read() with open(copy_file_path, "rb") as copy_file: copy_content = copy_file.read() assert original_content == copy_content class CommitTreeTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) b = Blob() b.data = b"foo the bar" t = Tree() t.add(b"somename", 0o100644, b.id) self.repo.object_store.add_object(t) self.repo.object_store.add_object(b) sha = porcelain.commit_tree( self.repo.path, t.id, message=b"Withcommit.", author=b"Joe ", committer=b"Jane ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class RevListTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) outstream = BytesIO() porcelain.rev_list( self.repo.path, [c3.id], outstream=outstream) self.assertEqual( c3.id + b"\n" + c2.id + b"\n" + c1.id + b"\n", outstream.getvalue()) class TagCreateTests(PorcelainTestCase): def test_annotated(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", b'foo ', b'bar', annotated=True) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b'refs/tags/tryme'] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar", tag.message) self.assertLess(time.time() - tag.tag_time, 5) def test_unannotated(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b'refs/tags/tryme'] self.assertEqual(list(tags.values()), [self.repo.head()]) def test_unannotated_unicode(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, "tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b'refs/tags/tryme'] self.assertEqual(list(tags.values()), [self.repo.head()]) class TagListTests(PorcelainTestCase): def test_empty(self): tags = porcelain.tag_list(self.repo.path) self.assertEqual([], tags) def test_simple(self): self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 tags = porcelain.tag_list(self.repo.path) self.assertEqual([b"bar/bla", b"foo"], tags) class TagDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.tag_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) porcelain.tag_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) class ResetTests(PorcelainTestCase): def test_hard_head(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit(self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ") with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: f.write(b"OOH") porcelain.reset(self.repo, "hard", b"HEAD") index = self.repo.open_index() changes = list(tree_changes(self.repo, index.commit(self.repo.object_store), self.repo[b'HEAD'].tree)) self.assertEqual([], changes) def test_hard_commit(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) sha = porcelain.commit(self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ") with open(fullpath, 'wb') as f: f.write(b"BAZ") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit(self.repo.path, message=b"Some other message", committer=b"Jane ", author=b"John ") porcelain.reset(self.repo, "hard", sha) index = self.repo.open_index() changes = list(tree_changes(self.repo, index.commit(self.repo.object_store), self.repo[sha].tree)) self.assertEqual([], changes) class PushTests(PorcelainTestCase): def test_simple(self): """ Basic test of porcelain push where self.repo is the remote. First clone the remote, commit a file to the clone, then push the changes back to the remote. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit(repo=self.repo.path, message=b'init', author=b'author ', committer=b'committer ') # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone(self.repo.path, target=clone_path, errstream=errstream) try: self.assertEqual(target_repo[b'HEAD'], self.repo[b'HEAD']) finally: target_repo.close() # create a second file to be pushed back to origin handle, fullpath = tempfile.mkstemp(dir=clone_path) os.close(handle) porcelain.add(repo=clone_path, paths=[fullpath]) porcelain.commit(repo=clone_path, message=b'push', author=b'author ', committer=b'committer ') # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" new_id = self.repo[b'HEAD'].id self.assertNotEqual(new_id, ZERO_SHA) self.repo.refs[refs_path] = new_id # Push to the remote porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream, errstream=errstream) # Check that the target and source with Repo(clone_path) as r_clone: self.assertEqual({ b'HEAD': new_id, b'refs/heads/foo': r_clone[b'HEAD'].id, b'refs/heads/master': new_id, }, self.repo.get_refs()) self.assertEqual(r_clone[b'HEAD'].id, self.repo[refs_path].id) # Get the change in the target repo corresponding to the add # this will be in the foo branch. change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree, self.repo[b'refs/heads/foo'].tree))[0] self.assertEqual(os.path.basename(fullpath), change.new.path.decode('ascii')) def test_delete(self): """Basic test of porcelain push, removing a branch. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit(repo=self.repo.path, message=b'init', author=b'author ', committer=b'committer ') # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone(self.repo.path, target=clone_path, errstream=errstream) target_repo.close() # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" new_id = self.repo[b'HEAD'].id self.assertNotEqual(new_id, ZERO_SHA) self.repo.refs[refs_path] = new_id # Push to the remote porcelain.push(clone_path, self.repo.path, b":" + refs_path, outstream=outstream, errstream=errstream) self.assertEqual({ b'HEAD': new_id, b'refs/heads/master': new_id, }, self.repo.get_refs()) class PullTests(PorcelainTestCase): def setUp(self): super(PullTests, self).setUp() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test ', committer=b'test ') # Setup target repo self.target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.target_path) target_repo = porcelain.clone(self.repo.path, target=self.target_path, errstream=BytesIO()) target_repo.close() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2 ', committer=b'test2 ') self.assertTrue(b'refs/heads/master' in self.repo.refs) self.assertTrue(b'refs/heads/master' in target_repo.refs) def test_simple(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull(self.target_path, self.repo.path, b'refs/heads/master', outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) def test_no_refspec(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull(self.target_path, self.repo.path, outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) class StatusTests(PorcelainTestCase): def test_empty(self): results = porcelain.status(self.repo) self.assertEqual( {'add': [], 'delete': [], 'modify': []}, results.staged) self.assertEqual([], results.unstaged) def test_status_base(self): """Integration test for `status` functionality.""" # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') # modify access and modify time of path os.utime(fullpath, (0, 0)) with open(fullpath, 'wb') as f: f.write(b'stuff') # Make a dummy file and stage it filename_add = 'bar' fullpath = os.path.join(self.repo.path, filename_add) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) results = porcelain.status(self.repo) self.assertEqual(results.staged['add'][0], filename_add.encode('ascii')) self.assertEqual(results.unstaged, [b'foo']) def test_status_all(self): del_path = os.path.join(self.repo.path, 'foo') mod_path = os.path.join(self.repo.path, 'bar') add_path = os.path.join(self.repo.path, 'baz') us_path = os.path.join(self.repo.path, 'blye') ut_path = os.path.join(self.repo.path, 'blyat') with open(del_path, 'w') as f: f.write('origstuff') with open(mod_path, 'w') as f: f.write('origstuff') with open(us_path, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[del_path, mod_path, us_path]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') porcelain.remove(self.repo.path, [del_path]) with open(add_path, 'w') as f: f.write('origstuff') with open(mod_path, 'w') as f: f.write('more_origstuff') with open(us_path, 'w') as f: f.write('more_origstuff') porcelain.add(repo=self.repo.path, paths=[add_path, mod_path]) with open(us_path, 'w') as f: f.write('\norigstuff') with open(ut_path, 'w') as f: f.write('origstuff') results = porcelain.status(self.repo.path) self.assertDictEqual( {'add': [b'baz'], 'delete': [b'foo'], 'modify': [b'bar']}, results.staged) self.assertListEqual(results.unstaged, [b'blye']) self.assertListEqual(results.untracked, ['blyat']) def test_status_crlf_mismatch(self): # First make a commit as if the file has been added on a Linux system # or with core.autocrlf=True file_path = os.path.join(self.repo.path, 'crlf') with open(file_path, 'wb') as f: f.write(b'line1\nline2') porcelain.add(repo=self.repo.path, paths=[file_path]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') # Then update the file as if it was created by CGit on a Windows # system with core.autocrlf=true with open(file_path, 'wb') as f: f.write(b'line1\r\nline2') results = porcelain.status(self.repo) self.assertDictEqual( {'add': [], 'delete': [], 'modify': []}, results.staged) self.assertListEqual(results.unstaged, [b'crlf']) self.assertListEqual(results.untracked, []) def test_status_crlf_convert(self): # First make a commit as if the file has been added on a Linux system # or with core.autocrlf=True file_path = os.path.join(self.repo.path, 'crlf') with open(file_path, 'wb') as f: f.write(b'line1\nline2') porcelain.add(repo=self.repo.path, paths=[file_path]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') # Then update the file as if it was created by CGit on a Windows # system with core.autocrlf=true with open(file_path, 'wb') as f: f.write(b'line1\r\nline2') # TODO: It should be set automatically by looking at the configuration c = self.repo.get_config() c.set("core", "autocrlf", True) c.write_to_path() results = porcelain.status(self.repo) self.assertDictEqual( {'add': [], 'delete': [], 'modify': []}, results.staged) self.assertListEqual(results.unstaged, []) self.assertListEqual(results.untracked, []) def test_get_tree_changes_add(self): """Unit test for get_tree_changes add.""" # Make a dummy file, stage filename = 'bar' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['add'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 1) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_modify(self): """Unit test for get_tree_changes modify.""" # Make a dummy file, stage, commit, modify filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') with open(fullpath, 'w') as f: f.write('otherstuff') porcelain.add(repo=self.repo.path, paths=fullpath) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['modify'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 1) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_delete(self): """Unit test for get_tree_changes delete.""" # Make a dummy file, stage, commit, remove filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.remove(repo=self.repo.path, paths=[filename]) finally: os.chdir(cwd) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['delete'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 1) def test_get_untracked_paths(self): with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write('ignored\n') with open(os.path.join(self.repo.path, 'ignored'), 'w') as f: f.write('blah\n') with open(os.path.join(self.repo.path, 'notignored'), 'w') as f: f.write('blah\n') self.assertEqual( set(['ignored', 'notignored', '.gitignore']), set(porcelain.get_untracked_paths(self.repo.path, self.repo.path, self.repo.open_index()))) self.assertEqual(set(['.gitignore', 'notignored']), set(porcelain.status(self.repo).untracked)) self.assertEqual(set(['.gitignore', 'notignored', 'ignored']), set(porcelain.status(self.repo, ignored=True) .untracked)) def test_get_untracked_paths_nested(self): with open(os.path.join(self.repo.path, 'notignored'), 'w') as f: f.write('blah\n') subrepo = Repo.init(os.path.join(self.repo.path, 'nested'), mkdir=True) with open(os.path.join(subrepo.path, 'another'), 'w') as f: f.write('foo\n') self.assertEqual( set(['notignored']), set(porcelain.get_untracked_paths(self.repo.path, self.repo.path, self.repo.open_index()))) self.assertEqual( set(['another']), set(porcelain.get_untracked_paths(subrepo.path, subrepo.path, subrepo.open_index()))) # TODO(jelmer): Add test for dulwich.porcelain.daemon class UploadPackTests(PorcelainTestCase): """Tests for upload_pack.""" def test_upload_pack(self): outf = BytesIO() exitcode = porcelain.upload_pack( self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([b"0000"], outlines) self.assertEqual(0, exitcode) class ReceivePackTests(PorcelainTestCase): """Tests for receive_pack.""" def test_receive_pack(self): filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) self.repo.do_commit(message=b'test status', author=b'author ', committer=b'committer ', author_timestamp=1402354300, commit_timestamp=1402354300, author_timezone=0, commit_timezone=0) outf = BytesIO() exitcode = porcelain.receive_pack( self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([ b'0091319b56ce3aee2d489f759736a79cc552c9bb86d9 HEAD\x00 report-status ' # noqa: E501 b'delete-refs quiet ofs-delta side-band-64k ' b'no-done symref=HEAD:refs/heads/master', b'003f319b56ce3aee2d489f759736a79cc552c9bb86d9 refs/heads/master', b'0000'], outlines) self.assertEqual(0, exitcode) class BranchListTests(PorcelainTestCase): def test_standard(self): self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchCreateTests(PorcelainTestCase): def test_branch_exists(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo") porcelain.branch_create(self.repo, b"foo", force=True) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) def test_simple_unicode(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, 'foo') self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, 'foo') self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) class FetchTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test ', committer=b'test ') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2 ', committer=b'test2 ') self.assertFalse(self.repo[b'HEAD'].id in target_repo) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch(target_path, self.repo.path, outstream=outstream, errstream=errstream) # Assert that fetch updated the local image of the remote self.assert_correct_remote_refs( target_repo.get_refs(), self.repo.get_refs()) # Check the target repo for pushed changes with Repo(target_path) as r: self.assertTrue(self.repo[b'HEAD'].id in r) def test_with_remote_name(self): remote_name = b'origin' outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test ', committer=b'test ') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) # Capture current refs target_refs = target_repo.get_refs() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2 ', committer=b'test2 ') self.assertFalse(self.repo[b'HEAD'].id in target_repo) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch(target_path, self.repo.path, remote_name=remote_name, outstream=outstream, errstream=errstream) # Assert that fetch updated the local image of the remote self.assert_correct_remote_refs( target_repo.get_refs(), self.repo.get_refs()) # Check the target repo for pushed changes, as well as updates # for the refs with Repo(target_path) as r: self.assertTrue(self.repo[b'HEAD'].id in r) self.assertNotEqual(self.repo.get_refs(), target_refs) def assert_correct_remote_refs( self, local_refs, remote_refs, remote_name=b'origin'): """Assert that known remote refs corresponds to actual remote refs.""" local_ref_prefix = b'refs/heads' remote_ref_prefix = b'refs/remotes/' + remote_name locally_known_remote_refs = { k[len(remote_ref_prefix) + 1:]: v for k, v in local_refs.items() if k.startswith(remote_ref_prefix)} normalized_remote_refs = { k[len(local_ref_prefix) + 1:]: v for k, v in remote_refs.items() if k.startswith(local_ref_prefix)} self.assertEqual(locally_known_remote_refs, normalized_remote_refs) class RepackTests(PorcelainTestCase): def test_empty(self): porcelain.repack(self.repo) def test_simple(self): handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.repack(self.repo) class LsTreeTests(PorcelainTestCase): def test_empty(self): porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual(f.getvalue(), "") def test_simple(self): # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual( f.getvalue(), '100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tfoo\n') def test_recursive(self): # Create a directory then write a dummy file in it dirpath = os.path.join(self.repo.path, 'adir') filepath = os.path.join(dirpath, 'afile') os.mkdir(dirpath) with open(filepath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[filepath]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual( f.getvalue(), '40000 tree b145cc69a5e17693e24d8a7be0016ed8075de66d\tadir\n') f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f, recursive=True) self.assertEqual( f.getvalue(), '40000 tree b145cc69a5e17693e24d8a7be0016ed8075de66d\tadir\n' '100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tadir' '/afile\n') class LsRemoteTests(PorcelainTestCase): def test_empty(self): self.assertEqual({}, porcelain.ls_remote(self.repo.path)) def test_some(self): cid = porcelain.commit(repo=self.repo.path, message=b'test status', author=b'author ', committer=b'committer ') self.assertEqual({ b'refs/heads/master': cid, b'HEAD': cid}, porcelain.ls_remote(self.repo.path)) class LsFilesTests(PorcelainTestCase): def test_empty(self): self.assertEqual([], list(porcelain.ls_files(self.repo))) def test_simple(self): # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[fullpath]) self.assertEqual([b'foo'], list(porcelain.ls_files(self.repo))) class RemoteAddTests(PorcelainTestCase): def test_new(self): porcelain.remote_add( self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich') c = self.repo.get_config() self.assertEqual( c.get((b'remote', b'jelmer'), b'url'), b'git://jelmer.uk/code/dulwich') def test_exists(self): porcelain.remote_add( self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich') self.assertRaises(porcelain.RemoteExists, porcelain.remote_add, self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich') class CheckIgnoreTests(PorcelainTestCase): def test_check_ignored(self): with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write('foo') foo_path = os.path.join(self.repo.path, 'foo') with open(foo_path, 'w') as f: f.write('BAR') bar_path = os.path.join(self.repo.path, 'bar') with open(bar_path, 'w') as f: f.write('BAR') self.assertEqual( ['foo'], list(porcelain.check_ignore(self.repo, [foo_path]))) self.assertEqual( [], list(porcelain.check_ignore(self.repo, [bar_path]))) def test_check_added_abs(self): path = os.path.join(self.repo.path, 'foo') with open(path, 'w') as f: f.write('BAR') self.repo.stage(['foo']) with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write('foo\n') self.assertEqual( [], list(porcelain.check_ignore(self.repo, [path]))) self.assertEqual( ['foo'], list(porcelain.check_ignore(self.repo, [path], no_index=True))) def test_check_added_rel(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write('BAR') self.repo.stage(['foo']) with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write('foo\n') cwd = os.getcwd() os.mkdir(os.path.join(self.repo.path, 'bar')) os.chdir(os.path.join(self.repo.path, 'bar')) try: self.assertEqual( list(porcelain.check_ignore(self.repo, ['../foo'])), []) self.assertEqual(['../foo'], list( porcelain.check_ignore(self.repo, ['../foo'], no_index=True))) finally: os.chdir(cwd) class UpdateHeadTests(PorcelainTestCase): def test_set_to_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, "blah") self.assertEqual(c1.id, self.repo.head()) self.assertEqual(b'ref: refs/heads/blah', self.repo.refs.read_ref(b'HEAD')) def test_set_to_branch_detached(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, "blah", detached=True) self.assertEqual(c1.id, self.repo.head()) self.assertEqual(c1.id, self.repo.refs.read_ref(b'HEAD')) def test_set_to_commit_detached(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, c1.id, detached=True) self.assertEqual(c1.id, self.repo.head()) self.assertEqual(c1.id, self.repo.refs.read_ref(b'HEAD')) def test_set_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo.refs[b"refs/heads/blah"] = c1.id porcelain.update_head(self.repo, "blah", new_branch="bar") self.assertEqual(c1.id, self.repo.head()) self.assertEqual(b'ref: refs/heads/bar', self.repo.refs.read_ref(b'HEAD')) class MailmapTests(PorcelainTestCase): def test_no_mailmap(self): self.assertEqual( b'Jelmer Vernooij ', porcelain.check_mailmap( self.repo, b'Jelmer Vernooij ')) def test_mailmap_lookup(self): with open(os.path.join(self.repo.path, '.mailmap'), 'wb') as f: f.write(b"""\ Jelmer Vernooij """) self.assertEqual( b'Jelmer Vernooij ', porcelain.check_mailmap( self.repo, b'Jelmer Vernooij ')) class FsckTests(PorcelainTestCase): def test_none(self): self.assertEqual( [], list(porcelain.fsck(self.repo))) def test_git_dir(self): obj = Tree() a = Blob() a.data = b"foo" obj.add(b".git", 0o100644, a.id) self.repo.object_store.add_objects( [(a, None), (obj, None)]) self.assertEqual( [(obj.id, 'invalid name .git')], [(sha, str(e)) for (sha, e) in porcelain.fsck(self.repo)]) class DescribeTests(PorcelainTestCase): def test_no_commits(self): self.assertRaises(KeyError, porcelain.describe, self.repo.path) def test_single_commit(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertEqual( 'g{}'.format(sha[:7].decode('ascii')), porcelain.describe(self.repo.path)) def test_tag(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") porcelain.tag_create(self.repo.path, b"tryme", b'foo ', b'bar', annotated=True) self.assertEqual( "tryme", porcelain.describe(self.repo.path)) def test_tag_and_commit(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") porcelain.tag_create(self.repo.path, b"tryme", b'foo ', b'bar', annotated=True) with open(fullpath, 'w') as f: f.write("BAR2") porcelain.add(repo=self.repo.path, paths=[fullpath]) sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertEqual( 'tryme-1-g{}'.format(sha[:7].decode('ascii')), porcelain.describe(self.repo.path)) class HelperTests(PorcelainTestCase): def test_path_to_tree_path_base(self): self.assertEqual( b'bar', porcelain.path_to_tree_path('/home/foo', '/home/foo/bar')) self.assertEqual(b'bar', porcelain.path_to_tree_path('.', './bar')) self.assertEqual(b'bar', porcelain.path_to_tree_path('.', 'bar')) cwd = os.getcwd() self.assertEqual( b'bar', porcelain.path_to_tree_path('.', os.path.join(cwd, 'bar'))) self.assertEqual(b'bar', porcelain.path_to_tree_path(cwd, 'bar')) def test_path_to_tree_path_syntax(self): self.assertEqual(b'bar', porcelain.path_to_tree_path(b'.', './bar')) self.assertEqual(b'bar', porcelain.path_to_tree_path('.', b'./bar')) self.assertEqual(b'bar', porcelain.path_to_tree_path(b'.', b'./bar')) def test_path_to_tree_path_error(self): with self.assertRaises(ValueError): porcelain.path_to_tree_path('/home/foo/', '/home/bar/baz') def test_path_to_tree_path_rel(self): cwd = os.getcwd() os.mkdir(os.path.join(self.repo.path, 'foo')) os.mkdir(os.path.join(self.repo.path, 'foo/bar')) try: os.chdir(os.path.join(self.repo.path, 'foo/bar')) self.assertEqual(b'bar/baz', porcelain.path_to_tree_path( '..', 'baz')) self.assertEqual(b'bar/baz', porcelain.path_to_tree_path( os.path.join(os.getcwd(), '..'), os.path.join(os.getcwd(), 'baz'))) self.assertEqual(b'bar/baz', porcelain.path_to_tree_path( '..', os.path.join(os.getcwd(), 'baz'))) self.assertEqual(b'bar/baz', porcelain.path_to_tree_path( os.path.join(os.getcwd(), '..'), 'baz')) finally: os.chdir(cwd) class GetObjectBypathTests(PorcelainTestCase): def test_simple(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertEqual( b"BAR", porcelain.get_object_by_path(self.repo, 'foo').data) def test_missing(self): self.assertRaises( KeyError, porcelain.get_object_by_path, self.repo, 'foo') class WriteTreeTests(PorcelainTestCase): def test_simple(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(repo=self.repo.path, paths=[fullpath]) self.assertEqual( b'd2092c8a9f311f0311083bf8d177f2ca0ab5b241', porcelain.write_tree(self.repo))