diff --git a/docs/tutorial/object-store.txt b/docs/tutorial/object-store.txt index c246ce2a..580b1748 100644 --- a/docs/tutorial/object-store.txt +++ b/docs/tutorial/object-store.txt @@ -1,187 +1,187 @@ .. _tutorial-object-store: The object store ================ The objects are stored in the ``object store`` of the repository. >>> from dulwich.repo import Repo >>> repo = Repo.init("myrepo", mkdir=True) Initial commit -------------- When you use Git, you generally add or modify content. As our repository is empty for now, we'll start by adding a new file:: >>> from dulwich.objects import Blob >>> blob = Blob.from_string(b"My file content\n") >>> print(blob.id.decode('ascii')) c55063a4d5d37aa1af2b2dad3a70aa34dae54dc6 Of course you could create a blob from an existing file using ``from_file`` instead. As said in the introduction, file content is separated from file name. Let's give this content a name:: >>> from dulwich.objects import Tree >>> tree = Tree() >>> tree.add(b"spam", 0o100644, blob.id) Note that "0o100644" is the octal form for a regular file with common permissions. You can hardcode them or you can use the ``stat`` module. The tree state of our repository still needs to be placed in time. That's the job of the commit:: >>> from dulwich.objects import Commit, parse_timezone >>> from time import time >>> commit = Commit() >>> commit.tree = tree.id >>> author = b"Your Name " >>> commit.author = commit.committer = author >>> commit.commit_time = commit.author_time = int(time()) >>> tz = parse_timezone(b'-0200')[0] >>> commit.commit_timezone = commit.author_timezone = tz >>> commit.encoding = b"UTF-8" >>> commit.message = b"Initial commit" Note that the initial commit has no parents. At this point, the repository is still empty because all operations happen in memory. Let's "commit" it. >>> object_store = repo.object_store >>> object_store.add_object(blob) Now the ".git/objects" folder contains a first SHA-1 file. Let's continue saving the changes:: >>> object_store.add_object(tree) >>> object_store.add_object(commit) Now the physical repository contains three objects but still has no branch. Let's create the master branch like Git would:: >>> repo.refs[b'refs/heads/master'] = commit.id The master branch now has a commit where to start. When we commit to master, we are also moving HEAD, which is Git's currently checked out branch: >>> head = repo.refs[b'HEAD'] >>> head == commit.id True >>> head == repo.refs[b'refs/heads/master'] True How did that work? As it turns out, HEAD is a special kind of ref called a symbolic ref, and it points at master. Most functions on the refs container work transparently with symbolic refs, but we can also take a peek inside HEAD: >>> import sys >>> print(repo.refs.read_ref(b'HEAD').decode(sys.getfilesystemencoding())) ref: refs/heads/master Normally, you won't need to use read_ref. If you want to change what ref HEAD points to, in order to check out another branch, just use set_symbolic_ref. Now our repository is officially tracking a branch named "master" referring to a single commit. Playing again with Git ---------------------- At this point you can come back to the shell, go into the "myrepo" folder and type ``git status`` to let Git confirm that this is a regular repository on branch "master". Git will tell you that the file "spam" is deleted, which is normal because Git is comparing the repository state with the current working copy. And we have absolutely no working copy using Dulwich because we don't need it at all! You can checkout the last state using ``git checkout -f``. The force flag will prevent Git from complaining that there are uncommitted changes in the working copy. The file ``spam`` appears and with no surprise contains the same bytes as the blob:: $ cat spam My file content Changing a File and Committing it --------------------------------- Now we have a first commit, the next one will show a difference. As seen in the introduction, it's about making a path in a tree point to a new blob. The old blob will remain to compute the diff. The tree is altered and the new commit'task is to point to this new version. Let's first build the blob:: >>> from dulwich.objects import Blob >>> spam = Blob.from_string(b"My new file content\n") >>> print(spam.id.decode('ascii')) 16ee2682887a962f854ebd25a61db16ef4efe49f An alternative is to alter the previously constructed blob object:: >>> blob.data = b"My new file content\n" >>> print(blob.id.decode('ascii')) 16ee2682887a962f854ebd25a61db16ef4efe49f In any case, update the blob id known as "spam". You also have the opportunity of changing its mode:: >>> tree[b"spam"] = (0o100644, spam.id) Now let's record the change:: >>> from dulwich.objects import Commit >>> from time import time >>> c2 = Commit() >>> c2.tree = tree.id >>> c2.parents = [commit.id] >>> c2.author = c2.committer = b"John Doe " >>> c2.commit_time = c2.author_time = int(time()) >>> c2.commit_timezone = c2.author_timezone = 0 >>> c2.encoding = b"UTF-8" >>> c2.message = b'Changing "spam"' In this new commit we record the changed tree id, and most important, the previous commit as the parent. Parents are actually a list because a commit may happen to have several parents after merging branches. Let's put the objects in the object store:: >>> repo.object_store.add_object(spam) >>> repo.object_store.add_object(tree) >>> repo.object_store.add_object(c2) You can already ask git to introspect this commit using ``git show`` and the value of ``c2.id`` as an argument. You'll see the difference will the previous blob recorded as "spam". The diff between the previous head and the new one can be printed using write_tree_diff:: >>> from dulwich.patch import write_tree_diff >>> import sys >>> write_tree_diff(sys.stdout, repo.object_store, commit.tree, tree.id) diff --git a/spam b/spam index c55063a..16ee268 100644 --- a/spam +++ b/spam - @@ -1,1 +1,1 @@ + @@ -1 +1 @@ -My file content +My new file content You won't see it using git log because the head is still the previous commit. It's easy to remedy:: >>> repo.refs[b'refs/heads/master'] = c2.id Now all git tools will work as expected. diff --git a/dulwich/patch.py b/dulwich/patch.py index b09669da..42599094 100644 --- a/dulwich/patch.py +++ b/dulwich/patch.py @@ -1,317 +1,344 @@ # patch.py -- For dealing with packed-style patches. # Copyright (C) 2009-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Classes for dealing with git am-style patches. These patches are basically unified diffs with some extra metadata tacked on. """ from difflib import SequenceMatcher import email.parser import time from dulwich.objects import ( Blob, Commit, S_ISGITLINK, ) FIRST_FEW_BYTES = 8000 def write_commit_patch(f, commit, contents, progress, version=None, encoding=None): """Write a individual file patch. :param commit: Commit object :param progress: Tuple with current patch number and total. :return: tuple with filename and contents """ encoding = encoding or getattr(f, "encoding", "ascii") if isinstance(contents, str): contents = contents.encode(encoding) (num, total) = progress f.write(b"From " + commit.id + b" " + time.ctime(commit.commit_time).encode(encoding) + b"\n") f.write(b"From: " + commit.author + b"\n") f.write(b"Date: " + time.strftime("%a, %d %b %Y %H:%M:%S %Z").encode(encoding) + b"\n") f.write(("Subject: [PATCH %d/%d] " % (num, total)).encode(encoding) + commit.message + b"\n") f.write(b"\n") f.write(b"---\n") try: import subprocess p = subprocess.Popen(["diffstat"], stdout=subprocess.PIPE, stdin=subprocess.PIPE) except (ImportError, OSError): pass # diffstat not available? else: (diffstat, _) = p.communicate(contents) f.write(diffstat) f.write(b"\n") f.write(contents) f.write(b"-- \n") if version is None: from dulwich import __version__ as dulwich_version f.write(b"Dulwich %d.%d.%d\n" % dulwich_version) else: f.write(version.encode(encoding) + b"\n") def get_summary(commit): """Determine the summary line for use in a filename. :param commit: Commit :return: Summary string """ return commit.message.splitlines()[0].replace(" ", "-") -def unified_diff(a, b, fromfile, tofile, n=3): - """difflib.unified_diff that doesn't write any dates or trailing spaces. - - Based on the same function in Python2.6.5-rc2's difflib.py - """ +# Unified Diff +def _format_range_unified(start, stop): + 'Convert range to the "ed" format' + # Per the diff spec at http://www.unix.org/single_unix_specification/ + beginning = start + 1 # lines start numbering with one + length = stop - start + if length == 1: + return '{}'.format(beginning) + if not length: + beginning -= 1 # empty ranges begin at line just before the range + return '{},{}'.format(beginning, length) + + +def unified_diff(a, b, fromfile='', tofile='', fromfiledate='', + tofiledate='', n=3, lineterm='\n'): started = False for group in SequenceMatcher(None, a, b).get_grouped_opcodes(n): if not started: - yield b'--- ' + fromfile + b'\n' - yield b'+++ ' + tofile + b'\n' started = True - i1, i2, j1, j2 = group[0][1], group[-1][2], group[0][3], group[-1][4] - sizes = "@@ -%d,%d +%d,%d @@\n" % (i1+1, i2-i1, j1+1, j2-j1) - yield sizes.encode('ascii') + fromdate = '\t{}'.format(fromfiledate) if fromfiledate else '' + todate = '\t{}'.format(tofiledate) if tofiledate else '' + yield '--- {}{}{}'.format( + fromfile.decode("ascii"), + fromdate, + lineterm + ).encode('ascii') + yield '+++ {}{}{}'.format( + tofile.decode("ascii"), + todate, + lineterm + ).encode('ascii') + + first, last = group[0], group[-1] + file1_range = _format_range_unified(first[1], last[2]) + file2_range = _format_range_unified(first[3], last[4]) + yield '@@ -{} +{} @@{}'.format( + file1_range, + file2_range, + lineterm + ).encode('ascii') + for tag, i1, i2, j1, j2 in group: if tag == 'equal': for line in a[i1:i2]: yield b' ' + line continue - if tag == 'replace' or tag == 'delete': + if tag in ('replace', 'delete'): for line in a[i1:i2]: if not line[-1:] == b'\n': line += b'\n\\ No newline at end of file\n' yield b'-' + line - if tag == 'replace' or tag == 'insert': + if tag in ('replace', 'insert'): for line in b[j1:j2]: if not line[-1:] == b'\n': line += b'\n\\ No newline at end of file\n' yield b'+' + line def is_binary(content): """See if the first few bytes contain any null characters. :param content: Bytestring to check for binary content """ return b'\0' in content[:FIRST_FEW_BYTES] def shortid(hexsha): if hexsha is None: return b"0" * 7 else: return hexsha[:7] def patch_filename(p, root): if p is None: return b"/dev/null" else: return root + b"/" + p def write_object_diff(f, store, old_file, new_file, diff_binary=False): """Write the diff for an object. :param f: File-like object to write to :param store: Store to retrieve objects from, if necessary :param old_file: (path, mode, hexsha) tuple :param new_file: (path, mode, hexsha) tuple :param diff_binary: Whether to diff files even if they are considered binary files by is_binary(). :note: the tuple elements should be None for nonexistant files """ (old_path, old_mode, old_id) = old_file (new_path, new_mode, new_id) = new_file old_path = patch_filename(old_path, b"a") new_path = patch_filename(new_path, b"b") def content(mode, hexsha): if hexsha is None: return Blob.from_string(b'') elif S_ISGITLINK(mode): return Blob.from_string(b"Submodule commit " + hexsha + b"\n") else: return store[hexsha] def lines(content): if not content: return [] else: return content.splitlines() f.writelines(gen_diff_header( (old_path, new_path), (old_mode, new_mode), (old_id, new_id))) old_content = content(old_mode, old_id) new_content = content(new_mode, new_id) if not diff_binary and ( is_binary(old_content.data) or is_binary(new_content.data)): f.write(b"Binary files " + old_path + b" and " + new_path + b" differ\n") else: f.writelines(unified_diff(lines(old_content), lines(new_content), old_path, new_path)) # TODO(jelmer): Support writing unicode, rather than bytes. def gen_diff_header(paths, modes, shas): """Write a blob diff header. :param paths: Tuple with old and new path :param modes: Tuple with old and new modes :param shas: Tuple with old and new shas """ (old_path, new_path) = paths (old_mode, new_mode) = modes (old_sha, new_sha) = shas yield b"diff --git " + old_path + b" " + new_path + b"\n" if old_mode != new_mode: if new_mode is not None: if old_mode is not None: yield ("old mode %o\n" % old_mode).encode('ascii') yield ("new mode %o\n" % new_mode).encode('ascii') else: yield ("deleted mode %o\n" % old_mode).encode('ascii') yield b"index " + shortid(old_sha) + b".." + shortid(new_sha) if new_mode is not None: yield (" %o" % new_mode).encode('ascii') yield b"\n" # TODO(jelmer): Support writing unicode, rather than bytes. def write_blob_diff(f, old_file, new_file): """Write blob diff. :param f: File-like object to write to :param old_file: (path, mode, hexsha) tuple (None if nonexisting) :param new_file: (path, mode, hexsha) tuple (None if nonexisting) :note: The use of write_object_diff is recommended over this function. """ (old_path, old_mode, old_blob) = old_file (new_path, new_mode, new_blob) = new_file old_path = patch_filename(old_path, b"a") new_path = patch_filename(new_path, b"b") def lines(blob): if blob is not None: return blob.splitlines() else: return [] f.writelines(gen_diff_header( (old_path, new_path), (old_mode, new_mode), (getattr(old_blob, "id", None), getattr(new_blob, "id", None)))) old_contents = lines(old_blob) new_contents = lines(new_blob) f.writelines(unified_diff(old_contents, new_contents, old_path, new_path)) def write_tree_diff(f, store, old_tree, new_tree, diff_binary=False): """Write tree diff. :param f: File-like object to write to. :param old_tree: Old tree id :param new_tree: New tree id :param diff_binary: Whether to diff files even if they are considered binary files by is_binary(). """ changes = store.tree_changes(old_tree, new_tree) for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes: write_object_diff(f, store, (oldpath, oldmode, oldsha), (newpath, newmode, newsha), diff_binary=diff_binary) def git_am_patch_split(f, encoding=None): """Parse a git-am-style patch and split it up into bits. :param f: File-like object to parse :param encoding: Encoding to use when creating Git objects :return: Tuple with commit object, diff contents and git version """ encoding = encoding or getattr(f, "encoding", "ascii") contents = f.read() if (isinstance(contents, bytes) and getattr(email.parser, "BytesParser", None)): parser = email.parser.BytesParser() msg = parser.parsebytes(contents) else: parser = email.parser.Parser() msg = parser.parsestr(contents) return parse_patch_message(msg, encoding) def parse_patch_message(msg, encoding=None): """Extract a Commit object and patch from an e-mail message. :param msg: An email message (email.message.Message) :param encoding: Encoding to use to encode Git commits :return: Tuple with commit object, diff contents and git version """ c = Commit() c.author = msg["from"].encode(encoding) c.committer = msg["from"].encode(encoding) try: patch_tag_start = msg["subject"].index("[PATCH") except ValueError: subject = msg["subject"] else: close = msg["subject"].index("] ", patch_tag_start) subject = msg["subject"][close+2:] c.message = (subject.replace("\n", "") + "\n").encode(encoding) first = True body = msg.get_payload(decode=True) lines = body.splitlines(True) line_iter = iter(lines) for l in line_iter: if l == b"---\n": break if first: if l.startswith(b"From: "): c.author = l[len(b"From: "):].rstrip() else: c.message += b"\n" + l first = False else: c.message += l diff = b"" for l in line_iter: if l == b"-- \n": break diff += l try: version = next(line_iter).rstrip(b"\n") except StopIteration: version = None return c, diff, version diff --git a/dulwich/tests/test_patch.py b/dulwich/tests/test_patch.py index b71f90e4..2a4b0705 100644 --- a/dulwich/tests/test_patch.py +++ b/dulwich/tests/test_patch.py @@ -1,539 +1,539 @@ # test_patch.py -- tests for patch.py # Copyright (C) 2010 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for patch.py.""" from io import BytesIO, StringIO from dulwich.objects import ( Blob, Commit, S_IFGITLINK, Tree, ) from dulwich.object_store import ( MemoryObjectStore, ) from dulwich.patch import ( git_am_patch_split, write_blob_diff, write_commit_patch, write_object_diff, write_tree_diff, ) from dulwich.tests import ( SkipTest, TestCase, ) class WriteCommitPatchTests(TestCase): def test_simple_bytesio(self): f = BytesIO() c = Commit() c.committer = c.author = b"Jelmer " c.commit_time = c.author_time = 1271350201 c.commit_timezone = c.author_timezone = 0 c.message = b"This is the first line\nAnd this is the second line.\n" c.tree = Tree().id write_commit_patch(f, c, b"CONTENTS", (1, 1), version="custom") f.seek(0) lines = f.readlines() self.assertTrue(lines[0].startswith( b"From 0b0d34d1b5b596c928adc9a727a4b9e03d025298")) self.assertEqual(lines[1], b"From: Jelmer \n") self.assertTrue(lines[2].startswith(b"Date: ")) self.assertEqual([ b"Subject: [PATCH 1/1] This is the first line\n", b"And this is the second line.\n", b"\n", b"\n", b"---\n"], lines[3:8]) self.assertEqual([ b"CONTENTS-- \n", b"custom\n"], lines[-2:]) if len(lines) >= 12: # diffstat may not be present self.assertEqual(lines[8], b" 0 files changed\n") class ReadGitAmPatch(TestCase): def test_extract_string(self): text = b"""\ From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a warning). --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split( StringIO(text.decode("utf-8")), "utf-8") self.assertEqual(b"Jelmer Vernooij ", c.committer) self.assertEqual(b"Jelmer Vernooij ", c.author) self.assertEqual(b"Remove executable bit from prey.ico " b"(triggers a warning).\n", c.message) self.assertEqual(b""" pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico """, diff) self.assertEqual(b"1.7.0.4", version) def test_extract_bytes(self): text = b"""\ From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [PATCH 1/2] Remove executable bit from prey.ico (triggers a warning). --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text)) self.assertEqual(b"Jelmer Vernooij ", c.committer) self.assertEqual(b"Jelmer Vernooij ", c.author) self.assertEqual(b"Remove executable bit from prey.ico " b"(triggers a warning).\n", c.message) self.assertEqual(b""" pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico """, diff) self.assertEqual(b"1.7.0.4", version) def test_extract_spaces(self): text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [Dulwich-users] [PATCH] Added unit tests for dulwich.object_store.tree_lookup_path. * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text), "utf-8") self.assertEqual(b'''\ Added unit tests for dulwich.object_store.tree_lookup_path. * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. ''', c.message) def test_extract_pseudo_from_header(self): text = b"""From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [Dulwich-users] [PATCH] Added unit tests for dulwich.object_store.tree_lookup_path. From: Jelmer Vernooy * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico -- 1.7.0.4 """ # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text), "utf-8") self.assertEqual(b"Jelmer Vernooy ", c.author) self.assertEqual(b'''\ Added unit tests for dulwich.object_store.tree_lookup_path. * dulwich/tests/test_object_store.py (TreeLookupPathTests): This test case contains a few tests that ensure the tree_lookup_path function works as expected. ''', c.message) def test_extract_no_version_tail(self): text = b"""\ From ff643aae102d8870cac88e8f007e70f58f3a7363 Mon Sep 17 00:00:00 2001 From: Jelmer Vernooij Date: Thu, 15 Apr 2010 15:40:28 +0200 Subject: [Dulwich-users] [PATCH] Added unit tests for dulwich.object_store.tree_lookup_path. From: Jelmer Vernooy --- pixmaps/prey.ico | Bin 9662 -> 9662 bytes 1 files changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 pixmaps/prey.ico """ c, diff, version = git_am_patch_split(BytesIO(text), "utf-8") self.assertEqual(None, version) def test_extract_mercurial(self): raise SkipTest( "git_am_patch_split doesn't handle Mercurial patches " "properly yet") expected_diff = """\ diff --git a/dulwich/tests/test_patch.py b/dulwich/tests/test_patch.py --- a/dulwich/tests/test_patch.py +++ b/dulwich/tests/test_patch.py @@ -158,7 +158,7 @@ ''' c, diff, version = git_am_patch_split(BytesIO(text)) - self.assertIs(None, version) + self.assertEqual(None, version) class DiffTests(TestCase): """ # noqa: W291,W293 text = """\ From dulwich-users-bounces+jelmer=samba.org@lists.launchpad.net \ Mon Nov 29 00:58:18 2010 Date: Sun, 28 Nov 2010 17:57:27 -0600 From: Augie Fackler To: dulwich-users Subject: [Dulwich-users] [PATCH] test_patch: fix tests on Python 2.6 Content-Transfer-Encoding: 8bit Change-Id: I5e51313d4ae3a65c3f00c665002a7489121bb0d6 %s _______________________________________________ Mailing list: https://launchpad.net/~dulwich-users Post to : dulwich-users@lists.launchpad.net Unsubscribe : https://launchpad.net/~dulwich-users More help : https://help.launchpad.net/ListHelp """ % expected_diff # noqa: W291 c, diff, version = git_am_patch_split(BytesIO(text)) self.assertEqual(expected_diff, diff) self.assertEqual(None, version) class DiffTests(TestCase): """Tests for write_blob_diff and write_tree_diff.""" def test_blob_diff(self): f = BytesIO() write_blob_diff( f, (b"foo.txt", 0o644, Blob.from_string(b"old\nsame\n")), (b"bar.txt", 0o644, Blob.from_string(b"new\nsame\n"))) self.assertEqual([ b"diff --git a/foo.txt b/bar.txt", b"index 3b0f961..a116b51 644", b"--- a/foo.txt", b"+++ b/bar.txt", b"@@ -1,2 +1,2 @@", b"-old", b"+new", b" same" ], f.getvalue().splitlines()) def test_blob_add(self): f = BytesIO() write_blob_diff( f, (None, None, None), (b"bar.txt", 0o644, Blob.from_string(b"new\nsame\n"))) self.assertEqual([ b'diff --git /dev/null b/bar.txt', b'new mode 644', b'index 0000000..a116b51 644', b'--- /dev/null', b'+++ b/bar.txt', - b'@@ -1,0 +1,2 @@', + b'@@ -0,0 +1,2 @@', b'+new', b'+same' ], f.getvalue().splitlines()) def test_blob_remove(self): f = BytesIO() write_blob_diff( f, (b"bar.txt", 0o644, Blob.from_string(b"new\nsame\n")), (None, None, None)) self.assertEqual([ b'diff --git a/bar.txt /dev/null', b'deleted mode 644', b'index a116b51..0000000', b'--- a/bar.txt', b'+++ /dev/null', - b'@@ -1,2 +1,0 @@', + b'@@ -1,2 +0,0 @@', b'-new', b'-same' ], f.getvalue().splitlines()) def test_tree_diff(self): f = BytesIO() store = MemoryObjectStore() added = Blob.from_string(b"add\n") removed = Blob.from_string(b"removed\n") changed1 = Blob.from_string(b"unchanged\nremoved\n") changed2 = Blob.from_string(b"unchanged\nadded\n") unchanged = Blob.from_string(b"unchanged\n") tree1 = Tree() tree1.add(b"removed.txt", 0o644, removed.id) tree1.add(b"changed.txt", 0o644, changed1.id) tree1.add(b"unchanged.txt", 0o644, changed1.id) tree2 = Tree() tree2.add(b"added.txt", 0o644, added.id) tree2.add(b"changed.txt", 0o644, changed2.id) tree2.add(b"unchanged.txt", 0o644, changed1.id) store.add_objects([(o, None) for o in [ tree1, tree2, added, removed, changed1, changed2, unchanged]]) write_tree_diff(f, store, tree1.id, tree2.id) self.assertEqual([ b'diff --git /dev/null b/added.txt', b'new mode 644', b'index 0000000..76d4bb8 644', b'--- /dev/null', b'+++ b/added.txt', - b'@@ -1,0 +1,1 @@', + b'@@ -0,0 +1 @@', b'+add', b'diff --git a/changed.txt b/changed.txt', b'index bf84e48..1be2436 644', b'--- a/changed.txt', b'+++ b/changed.txt', b'@@ -1,2 +1,2 @@', b' unchanged', b'-removed', b'+added', b'diff --git a/removed.txt /dev/null', b'deleted mode 644', b'index 2c3f0b3..0000000', b'--- a/removed.txt', b'+++ /dev/null', - b'@@ -1,1 +1,0 @@', + b'@@ -1 +0,0 @@', b'-removed', ], f.getvalue().splitlines()) def test_tree_diff_submodule(self): f = BytesIO() store = MemoryObjectStore() tree1 = Tree() tree1.add(b"asubmodule", S_IFGITLINK, b"06d0bdd9e2e20377b3180e4986b14c8549b393e4") tree2 = Tree() tree2.add(b"asubmodule", S_IFGITLINK, b"cc975646af69f279396d4d5e1379ac6af80ee637") store.add_objects([(o, None) for o in [tree1, tree2]]) write_tree_diff(f, store, tree1.id, tree2.id) self.assertEqual([ b'diff --git a/asubmodule b/asubmodule', b'index 06d0bdd..cc97564 160000', b'--- a/asubmodule', b'+++ b/asubmodule', - b'@@ -1,1 +1,1 @@', + b'@@ -1 +1 @@', b'-Submodule commit 06d0bdd9e2e20377b3180e4986b14c8549b393e4', b'+Submodule commit cc975646af69f279396d4d5e1379ac6af80ee637', ], f.getvalue().splitlines()) def test_object_diff_blob(self): f = BytesIO() b1 = Blob.from_string(b"old\nsame\n") b2 = Blob.from_string(b"new\nsame\n") store = MemoryObjectStore() store.add_objects([(b1, None), (b2, None)]) write_object_diff(f, store, (b"foo.txt", 0o644, b1.id), (b"bar.txt", 0o644, b2.id)) self.assertEqual([ b"diff --git a/foo.txt b/bar.txt", b"index 3b0f961..a116b51 644", b"--- a/foo.txt", b"+++ b/bar.txt", b"@@ -1,2 +1,2 @@", b"-old", b"+new", b" same" ], f.getvalue().splitlines()) def test_object_diff_add_blob(self): f = BytesIO() store = MemoryObjectStore() b2 = Blob.from_string(b"new\nsame\n") store.add_object(b2) write_object_diff(f, store, (None, None, None), (b"bar.txt", 0o644, b2.id)) self.assertEqual([ b'diff --git /dev/null b/bar.txt', b'new mode 644', b'index 0000000..a116b51 644', b'--- /dev/null', b'+++ b/bar.txt', - b'@@ -1,0 +1,2 @@', + b'@@ -0,0 +1,2 @@', b'+new', b'+same' ], f.getvalue().splitlines()) def test_object_diff_remove_blob(self): f = BytesIO() b1 = Blob.from_string(b"new\nsame\n") store = MemoryObjectStore() store.add_object(b1) write_object_diff(f, store, (b"bar.txt", 0o644, b1.id), (None, None, None)) self.assertEqual([ b'diff --git a/bar.txt /dev/null', b'deleted mode 644', b'index a116b51..0000000', b'--- a/bar.txt', b'+++ /dev/null', - b'@@ -1,2 +1,0 @@', + b'@@ -1,2 +0,0 @@', b'-new', b'-same' ], f.getvalue().splitlines()) def test_object_diff_bin_blob_force(self): f = BytesIO() # Prepare two slightly different PNG headers b1 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x04\x00\x00\x00\x05\x04\x8b") b2 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x03\x00\x00\x00\x98\xd3\xb3") store = MemoryObjectStore() store.add_objects([(b1, None), (b2, None)]) write_object_diff( f, store, (b'foo.png', 0o644, b1.id), (b'bar.png', 0o644, b2.id), diff_binary=True) self.assertEqual([ b'diff --git a/foo.png b/bar.png', b'index f73e47d..06364b7 644', b'--- a/foo.png', b'+++ b/bar.png', b'@@ -1,4 +1,4 @@', b' \x89PNG', b' \x1a', b' \x00\x00\x00', b'-IHDR\x00\x00\x01\xd5\x00\x00\x00' b'\x9f\x08\x04\x00\x00\x00\x05\x04\x8b', b'\\ No newline at end of file', b'+IHDR\x00\x00\x01\xd5\x00\x00\x00\x9f' b'\x08\x03\x00\x00\x00\x98\xd3\xb3', b'\\ No newline at end of file' ], f.getvalue().splitlines()) def test_object_diff_bin_blob(self): f = BytesIO() # Prepare two slightly different PNG headers b1 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x04\x00\x00\x00\x05\x04\x8b") b2 = Blob.from_string( b"\x89\x50\x4e\x47\x0d\x0a\x1a\x0a" b"\x00\x00\x00\x0d\x49\x48\x44\x52" b"\x00\x00\x01\xd5\x00\x00\x00\x9f" b"\x08\x03\x00\x00\x00\x98\xd3\xb3") store = MemoryObjectStore() store.add_objects([(b1, None), (b2, None)]) write_object_diff(f, store, (b'foo.png', 0o644, b1.id), (b'bar.png', 0o644, b2.id)) self.assertEqual([ b'diff --git a/foo.png b/bar.png', b'index f73e47d..06364b7 644', b'Binary files a/foo.png and b/bar.png differ' ], f.getvalue().splitlines()) def test_object_diff_add_bin_blob(self): f = BytesIO() b2 = Blob.from_string( b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a' b'\x00\x00\x00\x0d\x49\x48\x44\x52' b'\x00\x00\x01\xd5\x00\x00\x00\x9f' b'\x08\x03\x00\x00\x00\x98\xd3\xb3') store = MemoryObjectStore() store.add_object(b2) write_object_diff(f, store, (None, None, None), (b'bar.png', 0o644, b2.id)) self.assertEqual([ b'diff --git /dev/null b/bar.png', b'new mode 644', b'index 0000000..06364b7 644', b'Binary files /dev/null and b/bar.png differ' ], f.getvalue().splitlines()) def test_object_diff_remove_bin_blob(self): f = BytesIO() b1 = Blob.from_string( b'\x89\x50\x4e\x47\x0d\x0a\x1a\x0a' b'\x00\x00\x00\x0d\x49\x48\x44\x52' b'\x00\x00\x01\xd5\x00\x00\x00\x9f' b'\x08\x04\x00\x00\x00\x05\x04\x8b') store = MemoryObjectStore() store.add_object(b1) write_object_diff(f, store, (b'foo.png', 0o644, b1.id), (None, None, None)) self.assertEqual([ b'diff --git a/foo.png /dev/null', b'deleted mode 644', b'index f73e47d..0000000', b'Binary files a/foo.png and /dev/null differ' ], f.getvalue().splitlines()) def test_object_diff_kind_change(self): f = BytesIO() b1 = Blob.from_string(b"new\nsame\n") store = MemoryObjectStore() store.add_object(b1) write_object_diff( f, store, (b"bar.txt", 0o644, b1.id), (b"bar.txt", 0o160000, b"06d0bdd9e2e20377b3180e4986b14c8549b393e4")) self.assertEqual([ b'diff --git a/bar.txt b/bar.txt', b'old mode 644', b'new mode 160000', b'index a116b51..06d0bdd 160000', b'--- a/bar.txt', b'+++ b/bar.txt', - b'@@ -1,2 +1,1 @@', + b'@@ -1,2 +1 @@', b'-new', b'-same', b'+Submodule commit 06d0bdd9e2e20377b3180e4986b14c8549b393e4', ], f.getvalue().splitlines()) diff --git a/dulwich/tests/test_porcelain.py b/dulwich/tests/test_porcelain.py index 6a24f87a..adcf027a 100644 --- a/dulwich/tests/test_porcelain.py +++ b/dulwich/tests/test_porcelain.py @@ -1,1136 +1,1136 @@ # test_porcelain.py -- porcelain tests # Copyright (C) 2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for dulwich.porcelain.""" from io import BytesIO try: from StringIO import StringIO except ImportError: from io import StringIO import os import shutil import tarfile import tempfile import time from dulwich import porcelain from dulwich.diff_tree import tree_changes from dulwich.objects import ( Blob, Tag, Tree, ZERO_SHA, ) from dulwich.repo import Repo from dulwich.tests import ( TestCase, ) from dulwich.tests.utils import ( build_commit_graph, make_commit, make_object, ) class PorcelainTestCase(TestCase): def setUp(self): super(PorcelainTestCase, self).setUp() repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) self.repo = Repo.init(repo_dir) def tearDown(self): super(PorcelainTestCase, self).tearDown() self.repo.close() class ArchiveTests(PorcelainTestCase): """Tests for the archive command.""" def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/master"] = c3.id out = BytesIO() err = BytesIO() porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out, errstream=err) self.assertEqual(b"", err.getvalue()) tf = tarfile.TarFile(fileobj=out) self.addCleanup(tf.close) self.assertEqual([], tf.getnames()) class UpdateServerInfoTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id porcelain.update_server_info(self.repo.path) self.assertTrue(os.path.exists( os.path.join(self.repo.controldir(), 'info', 'refs'))) class CommitTests(PorcelainTestCase): def test_custom_author(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit( self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class CloneTests(PorcelainTestCase): def test_simple_local(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id self.repo.refs[b"refs/tags/foo"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone(self.repo.path, target_path, checkout=False, errstream=errstream) self.assertEqual(r.path, target_path) target_repo = Repo(target_path) self.assertEqual(target_repo.head(), c3.id) self.assertEqual(c3.id, target_repo.refs[b'refs/tags/foo']) self.assertTrue(b'f1' not in os.listdir(target_path)) self.assertTrue(b'f2' not in os.listdir(target_path)) c = r.get_config() encoded_path = self.repo.path if not isinstance(encoded_path, bytes): encoded_path = encoded_path.encode('utf-8') self.assertEqual(encoded_path, c.get((b'remote', b'origin'), b'url')) self.assertEqual( b'+refs/heads/*:refs/remotes/origin/*', c.get((b'remote', b'origin'), b'fetch')) def test_simple_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with porcelain.clone(self.repo.path, target_path, checkout=True, errstream=errstream) as r: self.assertEqual(r.path, target_path) with Repo(target_path) as r: self.assertEqual(r.head(), c3.id) self.assertTrue('f1' in os.listdir(target_path)) self.assertTrue('f2' in os.listdir(target_path)) def test_bare_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with porcelain.clone( self.repo.path, target_path, bare=True, errstream=errstream) as r: self.assertEqual(r.path, target_path) with Repo(target_path) as r: self.assertRaises(KeyError, r.head) self.assertFalse(b'f1' in os.listdir(target_path)) self.assertFalse(b'f2' in os.listdir(target_path)) def test_no_checkout_with_bare(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id self.repo.refs[b"HEAD"] = c1.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) self.assertRaises( ValueError, porcelain.clone, self.repo.path, target_path, checkout=True, bare=True, errstream=errstream) def test_no_head_no_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) errstream = BytesIO() r = porcelain.clone( self.repo.path, target_path, checkout=True, errstream=errstream) r.close() class InitTests(TestCase): def test_non_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir) def test_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir, bare=True) class AddTests(PorcelainTestCase): def test_add_default_paths(self): # create a file for initial commit fullpath = os.path.join(self.repo.path, 'blah') with open(fullpath, 'w') as f: f.write("\n") porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Add a second test file and a file in a directory with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("\n") os.mkdir(os.path.join(self.repo.path, 'adir')) with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f: f.write("\n") cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.add(self.repo.path) finally: os.chdir(cwd) # Check that foo was added and nothing in .git was modified index = self.repo.open_index() self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo']) def test_add_default_paths_subdir(self): os.mkdir(os.path.join(self.repo.path, 'foo')) with open(os.path.join(self.repo.path, 'blah'), 'w') as f: f.write("\n") with open(os.path.join(self.repo.path, 'foo', 'blie'), 'w') as f: f.write("\n") cwd = os.getcwd() try: os.chdir(os.path.join(self.repo.path, 'foo')) porcelain.add(repo=self.repo.path) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') finally: os.chdir(cwd) index = self.repo.open_index() self.assertEqual(sorted(index), [b'foo/blie']) def test_add_file(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) self.assertIn(b"foo", self.repo.open_index()) def test_add_ignored(self): with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write("foo") with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") with open(os.path.join(self.repo.path, 'bar'), 'w') as f: f.write("BAR") (added, ignored) = porcelain.add(self.repo.path, paths=[ os.path.join(self.repo.path, "foo"), os.path.join(self.repo.path, "bar")]) self.assertIn(b"bar", self.repo.open_index()) self.assertEqual(set(['bar']), set(added)) self.assertEqual(set(['foo']), ignored) def test_add_file_absolute_path(self): # Absolute paths are (not yet) supported with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo, paths=[os.path.join(self.repo.path, "foo")]) self.assertIn(b"foo", self.repo.open_index()) class RemoveTests(PorcelainTestCase): def test_remove_file(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo, message=b'test', author=b'test', committer=b'test') self.assertTrue(os.path.exists(os.path.join(self.repo.path, 'foo'))) cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.remove(self.repo.path, paths=["foo"]) finally: os.chdir(cwd) self.assertFalse(os.path.exists(os.path.join(self.repo.path, 'foo'))) def test_remove_file_staged(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.add(self.repo.path, paths=[fullpath]) self.assertRaises(Exception, porcelain.rm, self.repo.path, paths=["foo"]) finally: os.chdir(cwd) class LogTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream) self.assertEqual(3, outstream.getvalue().count("-" * 50)) def test_max_entries(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream, max_entries=1) self.assertEqual(1, outstream.getvalue().count("-" * 50)) class ShowTests(PorcelainTestCase): def test_nolist(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=c3.id, outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_blob(self): b = Blob.from_string(b"The Foo\n") self.repo.object_store.add_object(b) outstream = StringIO() porcelain.show(self.repo.path, objects=[b.id], outstream=outstream) self.assertEqual(outstream.getvalue(), "The Foo\n") def test_commit_no_parent(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) self.repo.object_store.add_objects([(a, None), (ta, None), (ca, None)]) outstream = StringIO() porcelain.show(self.repo.path, objects=[ca.id], outstream=outstream) self.assertMultiLineEqual(outstream.getvalue(), """\ -------------------------------------------------- commit: 344da06c1bb85901270b3e8875c988a027ec087d Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git /dev/null b/somename new mode 100644 index 0000000..ea5c7bf 100644 --- /dev/null +++ b/somename -@@ -1,0 +1,1 @@ +@@ -0,0 +1 @@ +The Foo """) def test_commit_with_change(self): a = Blob.from_string(b"The Foo\n") ta = Tree() ta.add(b"somename", 0o100644, a.id) ca = make_commit(tree=ta.id) b = Blob.from_string(b"The Bar\n") tb = Tree() tb.add(b"somename", 0o100644, b.id) cb = make_commit(tree=tb.id, parents=[ca.id]) self.repo.object_store.add_objects( [(a, None), (b, None), (ta, None), (tb, None), (ca, None), (cb, None)]) outstream = StringIO() porcelain.show(self.repo.path, objects=[cb.id], outstream=outstream) self.assertMultiLineEqual(outstream.getvalue(), """\ -------------------------------------------------- commit: 2c6b6c9cb72c130956657e1fdae58e5b103744fa Author: Test Author Committer: Test Committer Date: Fri Jan 01 2010 00:00:00 +0000 Test message. diff --git a/somename b/somename index ea5c7bf..fd38bcb 100644 --- a/somename +++ b/somename -@@ -1,1 +1,1 @@ +@@ -1 +1 @@ -The Foo +The Bar """) class SymbolicRefTests(PorcelainTestCase): def test_set_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar') def test_set_force_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True) # test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n') def test_set_symbolic_ref(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'master') def test_set_symbolic_ref_other_than_master(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]], attrs=dict(refs='develop')) self.repo.refs[b"HEAD"] = c3.id self.repo.refs[b"refs/heads/develop"] = c3.id porcelain.symbolic_ref(self.repo.path, b'develop') # test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/develop\n') class DiffTreeTests(PorcelainTestCase): def test_empty(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream) self.assertEqual(outstream.getvalue(), b"") class CommitTreeTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) b = Blob() b.data = b"foo the bar" t = Tree() t.add(b"somename", 0o100644, b.id) self.repo.object_store.add_object(t) self.repo.object_store.add_object(b) sha = porcelain.commit_tree( self.repo.path, t.id, message=b"Withcommit.", author=b"Joe ", committer=b"Jane ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class RevListTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) outstream = BytesIO() porcelain.rev_list( self.repo.path, [c3.id], outstream=outstream) self.assertEqual( c3.id + b"\n" + c2.id + b"\n" + c1.id + b"\n", outstream.getvalue()) class TagCreateTests(PorcelainTestCase): def test_annotated(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", b'foo ', b'bar', annotated=True) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b'refs/tags/tryme'] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar", tag.message) self.assertLess(time.time() - tag.tag_time, 5) def test_unannotated(self): c1, c2, c3 = build_commit_graph( self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b'refs/tags/tryme'] self.assertEqual(list(tags.values()), [self.repo.head()]) class TagListTests(PorcelainTestCase): def test_empty(self): tags = porcelain.tag_list(self.repo.path) self.assertEqual([], tags) def test_simple(self): self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 tags = porcelain.tag_list(self.repo.path) self.assertEqual([b"bar/bla", b"foo"], tags) class TagDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.tag_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) porcelain.tag_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) class ResetTests(PorcelainTestCase): def test_hard_head(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit(self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ") with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: f.write(b"OOH") porcelain.reset(self.repo, "hard", b"HEAD") index = self.repo.open_index() changes = list(tree_changes(self.repo, index.commit(self.repo.object_store), self.repo[b'HEAD'].tree)) self.assertEqual([], changes) def test_hard_commit(self): fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=[fullpath]) sha = porcelain.commit(self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ") with open(fullpath, 'wb') as f: f.write(b"BAZ") porcelain.add(self.repo.path, paths=[fullpath]) porcelain.commit(self.repo.path, message=b"Some other message", committer=b"Jane ", author=b"John ") porcelain.reset(self.repo, "hard", sha) index = self.repo.open_index() changes = list(tree_changes(self.repo, index.commit(self.repo.object_store), self.repo[sha].tree)) self.assertEqual([], changes) class PushTests(PorcelainTestCase): def test_simple(self): """ Basic test of porcelain push where self.repo is the remote. First clone the remote, commit a file to the clone, then push the changes back to the remote. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit(repo=self.repo.path, message=b'init', author=b'', committer=b'') # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone(self.repo.path, target=clone_path, errstream=errstream) try: self.assertEqual(target_repo[b'HEAD'], self.repo[b'HEAD']) finally: target_repo.close() # create a second file to be pushed back to origin handle, fullpath = tempfile.mkstemp(dir=clone_path) os.close(handle) porcelain.add(repo=clone_path, paths=[fullpath]) porcelain.commit(repo=clone_path, message=b'push', author=b'', committer=b'') # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" new_id = self.repo[b'HEAD'].id self.assertNotEqual(new_id, ZERO_SHA) self.repo.refs[refs_path] = new_id # Push to the remote porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream, errstream=errstream) # Check that the target and source with Repo(clone_path) as r_clone: self.assertEqual({ b'HEAD': new_id, b'refs/heads/foo': r_clone[b'HEAD'].id, b'refs/heads/master': new_id, }, self.repo.get_refs()) self.assertEqual(r_clone[b'HEAD'].id, self.repo[refs_path].id) # Get the change in the target repo corresponding to the add # this will be in the foo branch. change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree, self.repo[b'refs/heads/foo'].tree))[0] self.assertEqual(os.path.basename(fullpath), change.new.path.decode('ascii')) def test_delete(self): """Basic test of porcelain push, removing a branch. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit(repo=self.repo.path, message=b'init', author=b'', committer=b'') # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone(self.repo.path, target=clone_path, errstream=errstream) target_repo.close() # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" new_id = self.repo[b'HEAD'].id self.assertNotEqual(new_id, ZERO_SHA) self.repo.refs[refs_path] = new_id # Push to the remote porcelain.push(clone_path, self.repo.path, b":" + refs_path, outstream=outstream, errstream=errstream) self.assertEqual({ b'HEAD': new_id, b'refs/heads/master': new_id, }, self.repo.get_refs()) class PullTests(PorcelainTestCase): def setUp(self): super(PullTests, self).setUp() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Setup target repo self.target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, self.target_path) target_repo = porcelain.clone(self.repo.path, target=self.target_path, errstream=BytesIO()) target_repo.close() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2', committer=b'test2') self.assertTrue(b'refs/heads/master' in self.repo.refs) self.assertTrue(b'refs/heads/master' in target_repo.refs) def test_simple(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull(self.target_path, self.repo.path, b'refs/heads/master', outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) def test_no_refspec(self): outstream = BytesIO() errstream = BytesIO() # Pull changes into the cloned repo porcelain.pull(self.target_path, self.repo.path, outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with Repo(self.target_path) as r: self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) class StatusTests(PorcelainTestCase): def test_empty(self): results = porcelain.status(self.repo) self.assertEqual( {'add': [], 'delete': [], 'modify': []}, results.staged) self.assertEqual([], results.unstaged) def test_status(self): """Integration test for `status` functionality.""" # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') # modify access and modify time of path os.utime(fullpath, (0, 0)) with open(fullpath, 'wb') as f: f.write(b'stuff') # Make a dummy file and stage it filename_add = 'bar' fullpath = os.path.join(self.repo.path, filename_add) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) results = porcelain.status(self.repo) self.assertEqual(results.staged['add'][0], filename_add.encode('ascii')) self.assertEqual(results.unstaged, [b'foo']) def test_get_tree_changes_add(self): """Unit test for get_tree_changes add.""" # Make a dummy file, stage filename = 'bar' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['add'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 1) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_modify(self): """Unit test for get_tree_changes modify.""" # Make a dummy file, stage, commit, modify filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') with open(fullpath, 'w') as f: f.write('otherstuff') porcelain.add(repo=self.repo.path, paths=fullpath) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['modify'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 1) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_delete(self): """Unit test for get_tree_changes delete.""" # Make a dummy file, stage, commit, remove filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') cwd = os.getcwd() try: os.chdir(self.repo.path) porcelain.remove(repo=self.repo.path, paths=[filename]) finally: os.chdir(cwd) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['delete'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 1) def test_get_untracked_paths(self): with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write('ignored\n') with open(os.path.join(self.repo.path, 'ignored'), 'w') as f: f.write('blah\n') with open(os.path.join(self.repo.path, 'notignored'), 'w') as f: f.write('blah\n') self.assertEqual( set(['ignored', 'notignored', '.gitignore']), set(porcelain.get_untracked_paths(self.repo.path, self.repo.path, self.repo.open_index()))) self.assertEqual(set(['.gitignore', 'notignored']), set(porcelain.status(self.repo).untracked)) self.assertEqual(set(['.gitignore', 'notignored', 'ignored']), set(porcelain.status(self.repo, ignored=True) .untracked)) def test_get_untracked_paths_nested(self): with open(os.path.join(self.repo.path, 'notignored'), 'w') as f: f.write('blah\n') subrepo = Repo.init(os.path.join(self.repo.path, 'nested'), mkdir=True) with open(os.path.join(subrepo.path, 'another'), 'w') as f: f.write('foo\n') self.assertEqual( set(['notignored']), set(porcelain.get_untracked_paths(self.repo.path, self.repo.path, self.repo.open_index()))) self.assertEqual( set(['another']), set(porcelain.get_untracked_paths(subrepo.path, subrepo.path, subrepo.open_index()))) # TODO(jelmer): Add test for dulwich.porcelain.daemon class UploadPackTests(PorcelainTestCase): """Tests for upload_pack.""" def test_upload_pack(self): outf = BytesIO() exitcode = porcelain.upload_pack( self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([b"0000"], outlines) self.assertEqual(0, exitcode) class ReceivePackTests(PorcelainTestCase): """Tests for receive_pack.""" def test_receive_pack(self): filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=fullpath) self.repo.do_commit(message=b'test status', author=b'', committer=b'', author_timestamp=1402354300, commit_timestamp=1402354300, author_timezone=0, commit_timezone=0) outf = BytesIO() exitcode = porcelain.receive_pack( self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([ b'00739e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status ' # noqa: E501 b'delete-refs quiet ofs-delta side-band-64k no-done', b'003f9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 refs/heads/master', b'0000'], outlines) self.assertEqual(0, exitcode) class BranchListTests(PorcelainTestCase): def test_standard(self): self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchCreateTests(PorcelainTestCase): def test_branch_exists(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo") porcelain.branch_create(self.repo, b"foo", force=True) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) class FetchTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2', committer=b'test2') self.assertFalse(self.repo[b'HEAD'].id in target_repo) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch(target_path, self.repo.path, outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with Repo(target_path) as r: self.assertTrue(self.repo[b'HEAD'].id in r) class RepackTests(PorcelainTestCase): def test_empty(self): porcelain.repack(self.repo) def test_simple(self): handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) porcelain.add(repo=self.repo.path, paths=fullpath) porcelain.repack(self.repo) class LsTreeTests(PorcelainTestCase): def test_empty(self): porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual(f.getvalue(), "") def test_simple(self): # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=[fullpath]) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') f = StringIO() porcelain.ls_tree(self.repo, b"HEAD", outstream=f) self.assertEqual( f.getvalue(), '100644 blob 8b82634d7eae019850bb883f06abf428c58bc9aa\tfoo\n') class LsRemoteTests(PorcelainTestCase): def test_empty(self): self.assertEqual({}, porcelain.ls_remote(self.repo.path)) def test_some(self): cid = porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') self.assertEqual({ b'refs/heads/master': cid, b'HEAD': cid}, porcelain.ls_remote(self.repo.path)) class RemoteAddTests(PorcelainTestCase): def test_new(self): porcelain.remote_add( self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich') c = self.repo.get_config() self.assertEqual( c.get((b'remote', b'jelmer'), b'url'), b'git://jelmer.uk/code/dulwich') def test_exists(self): porcelain.remote_add( self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich') self.assertRaises(porcelain.RemoteExists, porcelain.remote_add, self.repo, 'jelmer', 'git://jelmer.uk/code/dulwich') class CheckIgnoreTests(PorcelainTestCase): def test_check_ignored(self): with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write("foo") with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") with open(os.path.join(self.repo.path, 'bar'), 'w') as f: f.write("BAR") self.assertEqual( ['foo'], list(porcelain.check_ignore(self.repo, ['foo']))) self.assertEqual([], list(porcelain.check_ignore(self.repo, ['bar']))) def test_check_added(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") self.repo.stage(['foo']) with open(os.path.join(self.repo.path, '.gitignore'), 'w') as f: f.write("foo\n") self.assertEqual( [], list(porcelain.check_ignore(self.repo, ['foo']))) self.assertEqual( ['foo'], list(porcelain.check_ignore(self.repo, ['foo'], no_index=True)))