diff --git a/dulwich/index.py b/dulwich/index.py index d527d40b..5233f14b 100644 --- a/dulwich/index.py +++ b/dulwich/index.py @@ -1,775 +1,783 @@ # index.py -- File parser/writer for the git index file # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Parser for the git index file format.""" import collections import errno import os import stat import struct import sys from dulwich.file import GitFile from dulwich.objects import ( Blob, S_IFGITLINK, S_ISGITLINK, Tree, hex_to_sha, sha_to_hex, ) from dulwich.pack import ( SHA1Reader, SHA1Writer, ) IndexEntry = collections.namedtuple( 'IndexEntry', [ 'ctime', 'mtime', 'dev', 'ino', 'mode', 'uid', 'gid', 'size', 'sha', 'flags']) FLAG_STAGEMASK = 0x3000 FLAG_VALID = 0x8000 FLAG_EXTENDED = 0x4000 def pathsplit(path): """Split a /-delimited path into a directory part and a basename. :param path: The path to split. :return: Tuple with directory name and basename """ try: (dirname, basename) = path.rsplit(b"/", 1) except ValueError: return (b"", path) else: return (dirname, basename) def pathjoin(*args): """Join a /-delimited path. """ return b"/".join([p for p in args if p]) def read_cache_time(f): """Read a cache time. :param f: File-like object to read from :return: Tuple with seconds and nanoseconds """ return struct.unpack(">LL", f.read(8)) def write_cache_time(f, t): """Write a cache time. :param f: File-like object to write to :param t: Time to write (as int, float or tuple with secs and nsecs) """ if isinstance(t, int): t = (t, 0) elif isinstance(t, float): (secs, nsecs) = divmod(t, 1.0) t = (int(secs), int(nsecs * 1000000000)) elif not isinstance(t, tuple): raise TypeError(t) f.write(struct.pack(">LL", *t)) def read_cache_entry(f): """Read an entry from a cache file. :param f: File-like object to read from :return: tuple with: device, inode, mode, uid, gid, size, sha, flags """ beginoffset = f.tell() ctime = read_cache_time(f) mtime = read_cache_time(f) (dev, ino, mode, uid, gid, size, sha, flags, ) = \ struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2)) name = f.read((flags & 0x0fff)) # Padding: real_size = ((f.tell() - beginoffset + 8) & ~7) f.read((beginoffset + real_size) - f.tell()) return (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha_to_hex(sha), flags & ~0x0fff) def write_cache_entry(f, entry): """Write an index entry to a file. :param f: File object :param entry: Entry to write, tuple with: (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ beginoffset = f.tell() (name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry write_cache_time(f, ctime) write_cache_time(f, mtime) flags = len(name) | (flags & ~0x0fff) f.write(struct.pack( b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF, mode, uid, gid, size, hex_to_sha(sha), flags)) f.write(name) real_size = ((f.tell() - beginoffset + 8) & ~7) f.write(b'\0' * ((beginoffset + real_size) - f.tell())) def read_index(f): """Read an index file, yielding the individual entries.""" header = f.read(4) if header != b'DIRC': raise AssertionError("Invalid index file header: %r" % header) (version, num_entries) = struct.unpack(b'>LL', f.read(4 * 2)) assert version in (1, 2) for i in range(num_entries): yield read_cache_entry(f) def read_index_dict(f): """Read an index file and return it as a dictionary. :param f: File object to read from """ ret = {} for x in read_index(f): ret[x[0]] = IndexEntry(*x[1:]) return ret def write_index(f, entries): """Write an index file. :param f: File-like object to write to :param entries: Iterable over the entries to write """ f.write(b'DIRC') f.write(struct.pack(b'>LL', 2, len(entries))) for x in entries: write_cache_entry(f, x) def write_index_dict(f, entries): """Write an index file based on the contents of a dictionary. """ entries_list = [] for name in sorted(entries): entries_list.append((name,) + tuple(entries[name])) write_index(f, entries_list) def cleanup_mode(mode): """Cleanup a mode value. This will return a mode that can be stored in a tree object. :param mode: Mode to clean up. """ if stat.S_ISLNK(mode): return stat.S_IFLNK elif stat.S_ISDIR(mode): return stat.S_IFDIR elif S_ISGITLINK(mode): return S_IFGITLINK ret = stat.S_IFREG | 0o644 ret |= (mode & 0o111) return ret class Index(object): """A Git Index file.""" def __init__(self, filename): """Open an index file. :param filename: Path to the index file """ self._filename = filename self.clear() self.read() @property def path(self): return self._filename def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._filename) def write(self): """Write current contents of index to disk.""" f = GitFile(self._filename, 'wb') try: f = SHA1Writer(f) write_index_dict(f, self._byname) finally: f.close() def read(self): """Read current contents of index from disk.""" if not os.path.exists(self._filename): return f = GitFile(self._filename, 'rb') try: f = SHA1Reader(f) for x in read_index(f): self[x[0]] = IndexEntry(*x[1:]) # FIXME: Additional data? f.read(os.path.getsize(self._filename)-f.tell()-20) f.check_sha() finally: f.close() def __len__(self): """Number of entries in this index file.""" return len(self._byname) def __getitem__(self, name): """Retrieve entry by relative path. :return: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) """ return self._byname[name] def __iter__(self): """Iterate over the paths in this index.""" return iter(self._byname) def get_sha1(self, path): """Return the (git object) SHA1 for the object at a path.""" return self[path].sha def get_mode(self, path): """Return the POSIX file mode for the object at a path.""" return self[path].mode def iterobjects(self): """Iterate over path, sha, mode tuples for use with commit_tree.""" for path in self: entry = self[path] yield path, entry.sha, cleanup_mode(entry.mode) def iterblobs(self): import warnings warnings.warn(PendingDeprecationWarning, 'Use iterobjects() instead.') return self.iterblobs() def clear(self): """Remove all contents from this index.""" self._byname = {} def __setitem__(self, name, x): assert isinstance(name, bytes) assert len(x) == 10 # Remove the old entry if any self._byname[name] = IndexEntry(*x) def __delitem__(self, name): assert isinstance(name, bytes) del self._byname[name] def iteritems(self): return self._byname.items() def update(self, entries): for name, value in entries.items(): self[name] = value def changes_from_tree(self, object_store, tree, want_unchanged=False): """Find the differences between the contents of this index and a tree. :param object_store: Object store to use for retrieving tree contents :param tree: SHA1 of the root tree :param want_unchanged: Whether unchanged files should be reported :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ def lookup_entry(path): entry = self[path] return entry.sha, entry.mode for (name, mode, sha) in changes_from_tree( self._byname.keys(), lookup_entry, object_store, tree, want_unchanged=want_unchanged): yield (name, mode, sha) def commit(self, object_store): """Create a new tree from an index. :param object_store: Object store to save the tree in :return: Root tree SHA """ return commit_tree(object_store, self.iterobjects()) def commit_tree(object_store, blobs): """Commit a new tree. :param object_store: Object store to add trees to :param blobs: Iterable over blob path, sha, mode entries :return: SHA1 of the created tree. """ trees = {b'': {}} def add_tree(path): if path in trees: return trees[path] dirname, basename = pathsplit(path) t = add_tree(dirname) assert isinstance(basename, bytes) newtree = {} t[basename] = newtree trees[path] = newtree return newtree for path, sha, mode in blobs: tree_path, basename = pathsplit(path) tree = add_tree(tree_path) tree[basename] = (mode, sha) def build_tree(path): tree = Tree() for basename, entry in trees[path].items(): if isinstance(entry, dict): mode = stat.S_IFDIR sha = build_tree(pathjoin(path, basename)) else: (mode, sha) = entry tree.add(basename, mode, sha) object_store.add_object(tree) return tree.id return build_tree(b'') def commit_index(object_store, index): """Create a new tree from an index. :param object_store: Object store to save the tree in :param index: Index file :note: This function is deprecated, use index.commit() instead. :return: Root tree sha. """ return commit_tree(object_store, index.iterobjects()) def changes_from_tree(names, lookup_entry, object_store, tree, want_unchanged=False): """Find the differences between the contents of a tree and a working copy. :param names: Iterable of names in the working copy :param lookup_entry: Function to lookup an entry in the working copy :param object_store: Object store to use for retrieving tree contents :param tree: SHA1 of the root tree, or None for an empty tree :param want_unchanged: Whether unchanged files should be reported :return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) """ # TODO(jelmer): Support a include_trees option other_names = set(names) if tree is not None: for (name, mode, sha) in object_store.iter_tree_contents(tree): try: (other_sha, other_mode) = lookup_entry(name) except KeyError: # Was removed yield ((name, None), (mode, None), (sha, None)) else: other_names.remove(name) if (want_unchanged or other_sha != sha or other_mode != mode): yield ((name, name), (mode, other_mode), (sha, other_sha)) # Mention added files for name in other_names: try: (other_sha, other_mode) = lookup_entry(name) except KeyError: pass else: yield ((None, name), (None, other_mode), (None, other_sha)) def index_entry_from_stat(stat_val, hex_sha, flags, mode=None): """Create a new index entry from a stat value. :param stat_val: POSIX stat_result instance :param hex_sha: Hex sha of the object :param flags: Index flags """ if mode is None: mode = cleanup_mode(stat_val.st_mode) return IndexEntry( stat_val.st_ctime, stat_val.st_mtime, stat_val.st_dev, stat_val.st_ino, mode, stat_val.st_uid, stat_val.st_gid, stat_val.st_size, hex_sha, flags) def build_file_from_blob(blob, mode, target_path, honor_filemode=True): """Build a file or symlink on disk based on a Git object. :param obj: The git object :param mode: File mode :param target_path: Path to write to :param honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit :return: stat object for the file """ try: oldstat = os.lstat(target_path) except OSError as e: if e.errno == errno.ENOENT: oldstat = None else: raise contents = blob.as_raw_string() if stat.S_ISLNK(mode): # FIXME: This will fail on Windows. What should we do instead? if oldstat: os.unlink(target_path) if sys.platform == 'win32' and sys.version_info[0] == 3: # os.readlink on Python3 on Windows requires a unicode string. # TODO(jelmer): Don't assume tree_encoding == fs_encoding tree_encoding = sys.getfilesystemencoding() contents = contents.decode(tree_encoding) target_path = target_path.decode(tree_encoding) os.symlink(contents, target_path) else: if oldstat is not None and oldstat.st_size == len(contents): with open(target_path, 'rb') as f: if f.read() == contents: return oldstat with open(target_path, 'wb') as f: # Write out file f.write(contents) if honor_filemode: os.chmod(target_path, mode) return os.lstat(target_path) INVALID_DOTNAMES = (b".git", b".", b"..", b"") def validate_path_element_default(element): return element.lower() not in INVALID_DOTNAMES def validate_path_element_ntfs(element): stripped = element.rstrip(b". ").lower() if stripped in INVALID_DOTNAMES: return False if stripped == b"git~1": return False return True def validate_path(path, element_validator=validate_path_element_default): """Default path validator that just checks for .git/.""" parts = path.split(b"/") for p in parts: if not element_validator(p): return False else: return True def build_index_from_tree(root_path, index_path, object_store, tree_id, honor_filemode=True, validate_path_element=validate_path_element_default): """Generate and materialize index from a tree :param tree_id: Tree to materialize :param root_path: Target dir for materialized index files :param index_path: Target path for generated index :param object_store: Non-empty object store holding tree contents :param honor_filemode: An optional flag to honor core.filemode setting in config file, default is core.filemode=True, change executable bit :param validate_path_element: Function to validate path elements to check out; default just refuses .git and .. directories. :note:: existing index is wiped and contents are not merged in a working dir. Suitable only for fresh clones. """ index = Index(index_path) if not isinstance(root_path, bytes): root_path = root_path.encode(sys.getfilesystemencoding()) for entry in object_store.iter_tree_contents(tree_id): if not validate_path(entry.path, validate_path_element): continue full_path = _tree_to_fs_path(root_path, entry.path) if not os.path.exists(os.path.dirname(full_path)): os.makedirs(os.path.dirname(full_path)) # TODO(jelmer): Merge new index into working tree if S_ISGITLINK(entry.mode): if not os.path.isdir(full_path): os.mkdir(full_path) st = os.lstat(full_path) # TODO(jelmer): record and return submodule paths else: obj = object_store[entry.sha] st = build_file_from_blob( obj, entry.mode, full_path, honor_filemode=honor_filemode) # Add file to index if not honor_filemode or S_ISGITLINK(entry.mode): # we can not use tuple slicing to build a new tuple, # because on windows that will convert the times to # longs, which causes errors further along st_tuple = (entry.mode, st.st_ino, st.st_dev, st.st_nlink, st.st_uid, st.st_gid, st.st_size, st.st_atime, st.st_mtime, st.st_ctime) st = st.__class__(st_tuple) index[entry.path] = index_entry_from_stat(st, entry.sha, 0) index.write() def blob_from_path_and_stat(fs_path, st): """Create a blob from a path and a stat object. :param fs_path: Full file system path to file :param st: A stat object :return: A `Blob` object """ assert isinstance(fs_path, bytes) blob = Blob() if not stat.S_ISLNK(st.st_mode): with open(fs_path, 'rb') as f: blob.data = f.read() else: if sys.platform == 'win32' and sys.version_info[0] == 3: # os.readlink on Python3 on Windows requires a unicode string. # TODO(jelmer): Don't assume tree_encoding == fs_encoding tree_encoding = sys.getfilesystemencoding() fs_path = fs_path.decode(tree_encoding) blob.data = os.readlink(fs_path).encode(tree_encoding) else: blob.data = os.readlink(fs_path) return blob def read_submodule_head(path): """Read the head commit of a submodule. :param path: path to the submodule :return: HEAD sha, None if not a valid head/repository """ from dulwich.errors import NotGitRepository from dulwich.repo import Repo try: repo = Repo(path) except NotGitRepository: return None try: return repo.head() except KeyError: return None def get_unstaged_changes(index, root_path): """Walk through an index and check for differences against working tree. :param index: index to check :param root_path: path in which to find files :return: iterator over paths with unstaged changes """ # For each entry in the index check the sha1 & ensure not staged if not isinstance(root_path, bytes): root_path = root_path.encode(sys.getfilesystemencoding()) for tree_path, entry in index.iteritems(): full_path = _tree_to_fs_path(root_path, tree_path) try: blob = blob_from_path_and_stat(full_path, os.lstat(full_path)) except OSError as e: if e.errno != errno.ENOENT: raise # The file was removed, so we assume that counts as # different from whatever file used to exist. yield tree_path except IOError as e: if e.errno != errno.EISDIR: raise # This is actually a directory if os.path.exists(os.path.join(tree_path, '.git')): # Submodule head = read_submodule_head(tree_path) if entry.sha != head: yield tree_path else: # The file was changed to a directory, so consider it removed. yield tree_path else: if blob.id != entry.sha: yield tree_path os_sep_bytes = os.sep.encode('ascii') def _tree_to_fs_path(root_path, tree_path): """Convert a git tree path to a file system path. :param root_path: Root filesystem path :param tree_path: Git tree path as bytes :return: File system path. """ assert isinstance(tree_path, bytes) if os_sep_bytes != b'/': sep_corrected_path = tree_path.replace(b'/', os_sep_bytes) else: sep_corrected_path = tree_path return os.path.join(root_path, sep_corrected_path) def _fs_to_tree_path(fs_path, fs_encoding=None): """Convert a file system path to a git tree path. :param fs_path: File system path. :param fs_encoding: File system encoding :return: Git tree path as bytes """ if fs_encoding is None: fs_encoding = sys.getfilesystemencoding() if not isinstance(fs_path, bytes): fs_path_bytes = fs_path.encode(fs_encoding) else: fs_path_bytes = fs_path if os_sep_bytes != b'/': tree_path = fs_path_bytes.replace(os_sep_bytes, b'/') else: tree_path = fs_path_bytes return tree_path -def index_entry_from_path(path): +def index_entry_from_path(path, object_store=None): """Create an index from a filesystem path. This returns an index value for files, symlinks and tree references. for directories and non-existant files it returns None :param path: Path to create an index entry for + :param object_store: Optional object store to + save new blobs in :return: An index entry """ try: st = os.lstat(path) blob = blob_from_path_and_stat(path, st) except EnvironmentError as e: if e.errno == errno.EISDIR: if os.path.exists(os.path.join(path, '.git')): head = read_submodule_head(path) if head is None: return None return index_entry_from_stat( st, head, 0, mode=S_IFGITLINK) else: raise else: raise else: + if object_store is not None: + object_store.add_object(blob) return index_entry_from_stat(st, blob.id, 0) -def iter_fresh_entries(paths, root_path): +def iter_fresh_entries(paths, root_path, object_store=None): """Iterate over current versions of index entries on disk. :param paths: Paths to iterate over :param root_path: Root path to access from + :param store: Optional store to save new blobs in :return: Iterator over path, index_entry """ for path in paths: p = _tree_to_fs_path(root_path, path) try: - entry = index_entry_from_path(p) + entry = index_entry_from_path(p, object_store=object_store) except EnvironmentError as e: if e.errno in (errno.ENOENT, errno.EISDIR): entry = None else: raise yield path, entry def iter_fresh_blobs(index, root_path): """Iterate over versions of blobs on disk referenced by index. Don't use this function; it removes missing entries from index. :param index: Index file :param root_path: Root path to access from :param include_deleted: Include deleted entries with sha and mode set to None :return: Iterator over path, sha, mode """ import warnings warnings.warn(PendingDeprecationWarning, "Use iter_fresh_objects instead.") for entry in iter_fresh_objects( index, root_path, include_deleted=True): if entry[1] is None: del index[entry[0]] else: yield entry -def iter_fresh_objects(paths, root_path, include_deleted=False): +def iter_fresh_objects(paths, root_path, include_deleted=False, + object_store=None): """Iterate over versions of objecs on disk referenced by index. :param index: Index file :param root_path: Root path to access from :param include_deleted: Include deleted entries with sha and mode set to None + :param object_store: Optional object store to report new items to :return: Iterator over path, sha, mode """ - for path, entry in iter_fresh_entries(paths, root_path): + for path, entry in iter_fresh_entries(paths, root_path, + object_store=object_store): if entry is None: if include_deleted: yield path, None, None else: entry = IndexEntry(*entry) yield path, entry.sha, cleanup_mode(entry.mode) def refresh_index(index, root_path): """Refresh the contents of an index. This is the equivalent to running 'git commit -a'. :param index: Index to update :param root_path: Root filesystem path """ for path, entry in iter_fresh_entries(index, root_path): index[path] = path diff --git a/dulwich/tests/test_client.py b/dulwich/tests/test_client.py index 7680bb47..d406d600 100644 --- a/dulwich/tests/test_client.py +++ b/dulwich/tests/test_client.py @@ -1,1115 +1,1114 @@ # test_client.py -- Tests for the git protocol, client side # Copyright (C) 2009 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # from io import BytesIO import base64 import sys import shutil import tempfile import warnings try: from urllib import quote as urlquote except ImportError: from urllib.parse import quote as urlquote try: import urlparse except ImportError: import urllib.parse as urlparse import urllib3 import dulwich from dulwich import ( client, ) from dulwich.client import ( LocalGitClient, TraditionalGitClient, TCPGitClient, SSHGitClient, HttpGitClient, ReportStatusParser, SendPackError, StrangeHostname, SubprocessSSHVendor, PuttySSHVendor, UpdateRefsError, default_urllib3_manager, get_transport_and_path, get_transport_and_path_from_url, ) from dulwich.config import ( ConfigDict, ) from dulwich.tests import ( TestCase, ) from dulwich.protocol import ( TCP_GIT_PORT, Protocol, ) from dulwich.pack import ( pack_objects_to_data, write_pack_data, write_pack_objects, ) from dulwich.objects import ( Commit, Tree ) from dulwich.repo import ( MemoryRepo, Repo, ) from dulwich.tests import skipIf from dulwich.tests.utils import ( open_repo, tear_down_repo, setup_warning_catcher, ) class DummyClient(TraditionalGitClient): def __init__(self, can_read, read, write): self.can_read = can_read self.read = read self.write = write TraditionalGitClient.__init__(self) def _connect(self, service, path): return Protocol(self.read, self.write), self.can_read class DummyPopen(): def __init__(self, *args, **kwards): self.stdin = BytesIO(b"stdin") self.stdout = BytesIO(b"stdout") self.stderr = BytesIO(b"stderr") self.returncode = 0 self.args = args self.kwargs = kwards def communicate(self, *args, **kwards): return ('Running', '') def wait(self, *args, **kwards): return False # TODO(durin42): add unit-level tests of GitClient class GitClientTests(TestCase): def setUp(self): super(GitClientTests, self).setUp() self.rout = BytesIO() self.rin = BytesIO() self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write) def test_caps(self): agent_cap = ( 'agent=dulwich/%d.%d.%d' % dulwich.__version__).encode('ascii') self.assertEqual(set([b'multi_ack', b'side-band-64k', b'ofs-delta', b'thin-pack', b'multi_ack_detailed', agent_cap]), set(self.client._fetch_capabilities)) self.assertEqual(set([b'ofs-delta', b'report-status', b'side-band-64k', agent_cap]), set(self.client._send_capabilities)) def test_archive_ack(self): self.rin.write( b'0009NACK\n' b'0000') self.rin.seek(0) self.client.archive(b'bla', b'HEAD', None, None) self.assertEqual(self.rout.getvalue(), b'0011argument HEAD0000') def test_fetch_empty(self): self.rin.write(b'0000') self.rin.seek(0) def check_heads(heads): self.assertEqual(heads, {}) return [] ret = self.client.fetch_pack(b'/', check_heads, None, None) self.assertEqual({}, ret.refs) self.assertEqual({}, ret.symrefs) def test_fetch_pack_ignores_magic_ref(self): self.rin.write( b'00000000000000000000000000000000000000000000 capabilities^{}' b'\x00 multi_ack ' b'thin-pack side-band side-band-64k ofs-delta shallow no-progress ' b'include-tag\n' b'0000') self.rin.seek(0) def check_heads(heads): self.assertEqual({}, heads) return [] ret = self.client.fetch_pack(b'bla', check_heads, None, None, None) self.assertEqual({}, ret.refs) self.assertEqual({}, ret.symrefs) self.assertEqual(self.rout.getvalue(), b'0000') def test_fetch_pack_none(self): self.rin.write( b'008855dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 HEAD\x00multi_ack ' b'thin-pack side-band side-band-64k ofs-delta shallow no-progress ' b'include-tag\n' b'0000') self.rin.seek(0) ret = self.client.fetch_pack( b'bla', lambda heads: [], None, None, None) self.assertEqual( {b'HEAD': b'55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7'}, ret.refs) self.assertEqual({}, ret.symrefs) self.assertEqual(self.rout.getvalue(), b'0000') def test_send_pack_no_sideband64k_with_update_ref_error(self): # No side-bank-64k reported by server shouldn't try to parse # side band data pkts = [b'55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 capabilities^{}' b'\x00 report-status delete-refs ofs-delta\n', b'', b"unpack ok", b"ng refs/foo/bar pre-receive hook declined", b''] for pkt in pkts: if pkt == b'': self.rin.write(b"0000") else: self.rin.write(("%04x" % (len(pkt)+4)).encode('ascii') + pkt) self.rin.seek(0) tree = Tree() commit = Commit() commit.tree = tree commit.parents = [] commit.author = commit.committer = b'test user' commit.commit_time = commit.author_time = 1174773719 commit.commit_timezone = commit.author_timezone = 0 commit.encoding = b'UTF-8' commit.message = b'test message' def determine_wants(refs): return {b'refs/foo/bar': commit.id, } def generate_pack_data(have, want, ofs_delta=False): return pack_objects_to_data([(commit, None), (tree, ''), ]) self.assertRaises(UpdateRefsError, self.client.send_pack, "blah", determine_wants, generate_pack_data) def test_send_pack_none(self): self.rin.write( b'0078310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/master\x00 report-status delete-refs ' b'side-band-64k quiet ofs-delta\n' b'0000') self.rin.seek(0) def determine_wants(refs): return { b'refs/heads/master': b'310ca9477129b8586fa2afc779c1f57cf64bba6c' } def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.client.send_pack(b'/', determine_wants, generate_pack_data) self.assertEqual(self.rout.getvalue(), b'0000') def test_send_pack_keep_and_delete(self): self.rin.write( b'0063310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/master\x00report-status delete-refs ofs-delta\n' b'003f310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/keepme\n' b'0000000eunpack ok\n' b'0019ok refs/heads/master\n' b'0000') self.rin.seek(0) def determine_wants(refs): return {b'refs/heads/master': b'0' * 40} def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.client.send_pack(b'/', determine_wants, generate_pack_data) self.assertIn( self.rout.getvalue(), [b'007f310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'0000000000000000000000000000000000000000 ' b'refs/heads/master\x00report-status ofs-delta0000', b'007f310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'0000000000000000000000000000000000000000 ' b'refs/heads/master\x00ofs-delta report-status0000']) def test_send_pack_delete_only(self): self.rin.write( b'0063310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/master\x00report-status delete-refs ofs-delta\n' b'0000000eunpack ok\n' b'0019ok refs/heads/master\n' b'0000') self.rin.seek(0) def determine_wants(refs): return {b'refs/heads/master': b'0' * 40} def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.client.send_pack(b'/', determine_wants, generate_pack_data) self.assertIn( self.rout.getvalue(), [b'007f310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'0000000000000000000000000000000000000000 ' b'refs/heads/master\x00report-status ofs-delta0000', b'007f310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'0000000000000000000000000000000000000000 ' b'refs/heads/master\x00ofs-delta report-status0000']) def test_send_pack_new_ref_only(self): self.rin.write( b'0063310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/master\x00report-status delete-refs ofs-delta\n' b'0000000eunpack ok\n' b'0019ok refs/heads/blah12\n' b'0000') self.rin.seek(0) def determine_wants(refs): return { b'refs/heads/blah12': b'310ca9477129b8586fa2afc779c1f57cf64bba6c', b'refs/heads/master': b'310ca9477129b8586fa2afc779c1f57cf64bba6c' } def generate_pack_data(have, want, ofs_delta=False): return 0, [] f = BytesIO() write_pack_objects(f, {}) self.client.send_pack('/', determine_wants, generate_pack_data) self.assertIn( self.rout.getvalue(), [b'007f0000000000000000000000000000000000000000 ' b'310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/blah12\x00report-status ofs-delta0000' + f.getvalue(), b'007f0000000000000000000000000000000000000000 ' b'310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/blah12\x00ofs-delta report-status0000' + f.getvalue()]) def test_send_pack_new_ref(self): self.rin.write( b'0064310ca9477129b8586fa2afc779c1f57cf64bba6c ' b'refs/heads/master\x00 report-status delete-refs ofs-delta\n' b'0000000eunpack ok\n' b'0019ok refs/heads/blah12\n' b'0000') self.rin.seek(0) tree = Tree() commit = Commit() commit.tree = tree commit.parents = [] commit.author = commit.committer = b'test user' commit.commit_time = commit.author_time = 1174773719 commit.commit_timezone = commit.author_timezone = 0 commit.encoding = b'UTF-8' commit.message = b'test message' def determine_wants(refs): return { b'refs/heads/blah12': commit.id, b'refs/heads/master': b'310ca9477129b8586fa2afc779c1f57cf64bba6c' } def generate_pack_data(have, want, ofs_delta=False): return pack_objects_to_data([(commit, None), (tree, b''), ]) f = BytesIO() write_pack_data(f, *generate_pack_data(None, None)) self.client.send_pack(b'/', determine_wants, generate_pack_data) self.assertIn( self.rout.getvalue(), [b'007f0000000000000000000000000000000000000000 ' + commit.id + b' refs/heads/blah12\x00report-status ofs-delta0000' + f.getvalue(), b'007f0000000000000000000000000000000000000000 ' + commit.id + b' refs/heads/blah12\x00ofs-delta report-status0000' + f.getvalue()]) def test_send_pack_no_deleteref_delete_only(self): pkts = [b'310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master' b'\x00 report-status ofs-delta\n', b'', b''] for pkt in pkts: if pkt == b'': self.rin.write(b"0000") else: self.rin.write(("%04x" % (len(pkt)+4)).encode('ascii') + pkt) self.rin.seek(0) def determine_wants(refs): return {b'refs/heads/master': b'0' * 40} def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.assertRaises(UpdateRefsError, self.client.send_pack, b"/", determine_wants, generate_pack_data) self.assertEqual(self.rout.getvalue(), b'0000') class TestGetTransportAndPath(TestCase): def test_tcp(self): c, path = get_transport_and_path('git://foo.com/bar/baz') self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual('foo.com', c._host) self.assertEqual(TCP_GIT_PORT, c._port) self.assertEqual('/bar/baz', path) def test_tcp_port(self): c, path = get_transport_and_path('git://foo.com:1234/bar/baz') self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual('foo.com', c._host) self.assertEqual(1234, c._port) self.assertEqual('/bar/baz', path) def test_git_ssh_explicit(self): c, path = get_transport_and_path('git+ssh://foo.com/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('/bar/baz', path) def test_ssh_explicit(self): c, path = get_transport_and_path('ssh://foo.com/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('/bar/baz', path) def test_ssh_port_explicit(self): c, path = get_transport_and_path( 'git+ssh://foo.com:1234/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(1234, c.port) self.assertEqual('/bar/baz', path) def test_username_and_port_explicit_unknown_scheme(self): c, path = get_transport_and_path( 'unknown://git@server:7999/dply/stuff.git') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('unknown', c.host) self.assertEqual('//git@server:7999/dply/stuff.git', path) def test_username_and_port_explicit(self): c, path = get_transport_and_path( 'ssh://git@server:7999/dply/stuff.git') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('git', c.username) self.assertEqual('server', c.host) self.assertEqual(7999, c.port) self.assertEqual('/dply/stuff.git', path) def test_ssh_abspath_doubleslash(self): c, path = get_transport_and_path('git+ssh://foo.com//bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('//bar/baz', path) def test_ssh_port(self): c, path = get_transport_and_path( 'git+ssh://foo.com:1234/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(1234, c.port) self.assertEqual('/bar/baz', path) def test_ssh_implicit(self): c, path = get_transport_and_path('foo:/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('/bar/baz', path) def test_ssh_host(self): c, path = get_transport_and_path('foo.com:/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('/bar/baz', path) def test_ssh_user_host(self): c, path = get_transport_and_path('user@foo.com:/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual('user', c.username) self.assertEqual('/bar/baz', path) def test_ssh_relpath(self): c, path = get_transport_and_path('foo:bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('bar/baz', path) def test_ssh_host_relpath(self): c, path = get_transport_and_path('foo.com:bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('bar/baz', path) def test_ssh_user_host_relpath(self): c, path = get_transport_and_path('user@foo.com:bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual('user', c.username) self.assertEqual('bar/baz', path) def test_local(self): c, path = get_transport_and_path('foo.bar/baz') self.assertTrue(isinstance(c, LocalGitClient)) self.assertEqual('foo.bar/baz', path) @skipIf(sys.platform != 'win32', 'Behaviour only happens on windows.') def test_local_abs_windows_path(self): c, path = get_transport_and_path('C:\\foo.bar\\baz') self.assertTrue(isinstance(c, LocalGitClient)) self.assertEqual('C:\\foo.bar\\baz', path) def test_error(self): # Need to use a known urlparse.uses_netloc URL scheme to get the # expected parsing of the URL on Python versions less than 2.6.5 c, path = get_transport_and_path('prospero://bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) def test_http(self): url = 'https://github.com/jelmer/dulwich' c, path = get_transport_and_path(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual('/jelmer/dulwich', path) def test_http_auth(self): url = 'https://user:passwd@github.com/jelmer/dulwich' c, path = get_transport_and_path(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual('/jelmer/dulwich', path) self.assertEqual('user', c._username) self.assertEqual('passwd', c._password) def test_http_no_auth(self): url = 'https://github.com/jelmer/dulwich' c, path = get_transport_and_path(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual('/jelmer/dulwich', path) self.assertIs(None, c._username) self.assertIs(None, c._password) class TestGetTransportAndPathFromUrl(TestCase): def test_tcp(self): c, path = get_transport_and_path_from_url('git://foo.com/bar/baz') self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual('foo.com', c._host) self.assertEqual(TCP_GIT_PORT, c._port) self.assertEqual('/bar/baz', path) def test_tcp_port(self): c, path = get_transport_and_path_from_url('git://foo.com:1234/bar/baz') self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual('foo.com', c._host) self.assertEqual(1234, c._port) self.assertEqual('/bar/baz', path) def test_ssh_explicit(self): c, path = get_transport_and_path_from_url('git+ssh://foo.com/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('/bar/baz', path) def test_ssh_port_explicit(self): c, path = get_transport_and_path_from_url( 'git+ssh://foo.com:1234/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(1234, c.port) self.assertEqual('/bar/baz', path) def test_ssh_homepath(self): c, path = get_transport_and_path_from_url( 'git+ssh://foo.com/~/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual('/~/bar/baz', path) def test_ssh_port_homepath(self): c, path = get_transport_and_path_from_url( 'git+ssh://foo.com:1234/~/bar/baz') self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual('foo.com', c.host) self.assertEqual(1234, c.port) self.assertEqual('/~/bar/baz', path) def test_ssh_host_relpath(self): self.assertRaises( ValueError, get_transport_and_path_from_url, 'foo.com:bar/baz') def test_ssh_user_host_relpath(self): self.assertRaises( ValueError, get_transport_and_path_from_url, 'user@foo.com:bar/baz') def test_local_path(self): self.assertRaises( ValueError, get_transport_and_path_from_url, 'foo.bar/baz') def test_error(self): # Need to use a known urlparse.uses_netloc URL scheme to get the # expected parsing of the URL on Python versions less than 2.6.5 self.assertRaises( ValueError, get_transport_and_path_from_url, 'prospero://bar/baz') def test_http(self): url = 'https://github.com/jelmer/dulwich' c, path = get_transport_and_path_from_url(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual('/jelmer/dulwich', path) def test_file(self): c, path = get_transport_and_path_from_url('file:///home/jelmer/foo') self.assertTrue(isinstance(c, LocalGitClient)) self.assertEqual('/home/jelmer/foo', path) class TestSSHVendor(object): def __init__(self): self.host = None self.command = "" self.username = None self.port = None self.password = None self.key_filename = None def run_command(self, host, command, username=None, port=None, password=None, key_filename=None): self.host = host self.command = command self.username = username self.port = port self.password = password self.key_filename = key_filename class Subprocess: pass setattr(Subprocess, 'read', lambda: None) setattr(Subprocess, 'write', lambda: None) setattr(Subprocess, 'close', lambda: None) setattr(Subprocess, 'can_read', lambda: None) return Subprocess() class SSHGitClientTests(TestCase): def setUp(self): super(SSHGitClientTests, self).setUp() self.server = TestSSHVendor() self.real_vendor = client.get_ssh_vendor client.get_ssh_vendor = lambda: self.server self.client = SSHGitClient('git.samba.org') def tearDown(self): super(SSHGitClientTests, self).tearDown() client.get_ssh_vendor = self.real_vendor def test_get_url(self): path = '/tmp/repo.git' c = SSHGitClient('git.samba.org') url = c.get_url(path) self.assertEqual('ssh://git.samba.org/tmp/repo.git', url) def test_get_url_with_username_and_port(self): path = '/tmp/repo.git' c = SSHGitClient('git.samba.org', port=2222, username='user') url = c.get_url(path) self.assertEqual('ssh://user@git.samba.org:2222/tmp/repo.git', url) def test_default_command(self): self.assertEqual( b'git-upload-pack', self.client._get_cmd_path(b'upload-pack')) def test_alternative_command_path(self): self.client.alternative_paths[b'upload-pack'] = ( b'/usr/lib/git/git-upload-pack') self.assertEqual( b'/usr/lib/git/git-upload-pack', self.client._get_cmd_path(b'upload-pack')) def test_alternative_command_path_spaces(self): self.client.alternative_paths[b'upload-pack'] = ( b'/usr/lib/git/git-upload-pack -ibla') self.assertEqual(b"/usr/lib/git/git-upload-pack -ibla", self.client._get_cmd_path(b'upload-pack')) def test_connect(self): server = self.server client = self.client client.username = b"username" client.port = 1337 client._connect(b"command", b"/path/to/repo") self.assertEqual(b"username", server.username) self.assertEqual(1337, server.port) self.assertEqual("git-command '/path/to/repo'", server.command) client._connect(b"relative-command", b"/~/path/to/repo") self.assertEqual("git-relative-command '~/path/to/repo'", server.command) class ReportStatusParserTests(TestCase): def test_invalid_pack(self): parser = ReportStatusParser() parser.handle_packet(b"unpack error - foo bar") parser.handle_packet(b"ok refs/foo/bar") parser.handle_packet(None) self.assertRaises(SendPackError, parser.check) def test_update_refs_error(self): parser = ReportStatusParser() parser.handle_packet(b"unpack ok") parser.handle_packet(b"ng refs/foo/bar need to pull") parser.handle_packet(None) self.assertRaises(UpdateRefsError, parser.check) def test_ok(self): parser = ReportStatusParser() parser.handle_packet(b"unpack ok") parser.handle_packet(b"ok refs/foo/bar") parser.handle_packet(None) parser.check() class LocalGitClientTests(TestCase): def test_get_url(self): path = "/tmp/repo.git" c = LocalGitClient() url = c.get_url(path) self.assertEqual('file:///tmp/repo.git', url) def test_fetch_into_empty(self): c = LocalGitClient() t = MemoryRepo() s = open_repo('a.git') self.addCleanup(tear_down_repo, s) self.assertEqual(s.get_refs(), c.fetch(s.path, t).refs) def test_fetch_empty(self): c = LocalGitClient() s = open_repo('a.git') self.addCleanup(tear_down_repo, s) out = BytesIO() walker = {} ret = c.fetch_pack( s.path, lambda heads: [], graph_walker=walker, pack_data=out.write) self.assertEqual({ b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', b'refs/tags/mytag-packed': b'b0931cadc54336e78a1d980420e3268903b57a50' }, ret.refs) self.assertEqual( {b'HEAD': b'refs/heads/master'}, ret.symrefs) self.assertEqual( b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08" b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e", out.getvalue()) def test_fetch_pack_none(self): c = LocalGitClient() s = open_repo('a.git') self.addCleanup(tear_down_repo, s) out = BytesIO() walker = MemoryRepo().get_graph_walker() ret = c.fetch_pack( s.path, lambda heads: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], graph_walker=walker, pack_data=out.write) self.assertEqual({b'HEAD': b'refs/heads/master'}, ret.symrefs) self.assertEqual({ b'HEAD': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/heads/master': b'a90fa2d900a17e99b433217e988c4eb4a2e9a097', b'refs/tags/mytag': b'28237f4dc30d0d462658d6b937b08a0f0b6ef55a', b'refs/tags/mytag-packed': b'b0931cadc54336e78a1d980420e3268903b57a50' }, ret.refs) # Hardcoding is not ideal, but we'll fix that some other day.. self.assertTrue(out.getvalue().startswith( b'PACK\x00\x00\x00\x02\x00\x00\x00\x07')) def test_send_pack_without_changes(self): local = open_repo('a.git') self.addCleanup(tear_down_repo, local) target = open_repo('a.git') self.addCleanup(tear_down_repo, target) self.send_and_verify(b"master", local, target) def test_send_pack_with_changes(self): local = open_repo('a.git') self.addCleanup(tear_down_repo, local) target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) with Repo.init_bare(target_path) as target: self.send_and_verify(b"master", local, target) def test_get_refs(self): local = open_repo('refs.git') self.addCleanup(tear_down_repo, local) client = LocalGitClient() refs = client.get_refs(local.path) self.assertDictEqual(local.refs.as_dict(), refs) def send_and_verify(self, branch, local, target): """Send branch from local to remote repository and verify it worked.""" client = LocalGitClient() ref_name = b"refs/heads/" + branch new_refs = client.send_pack(target.path, lambda _: {ref_name: local.refs[ref_name]}, local.object_store.generate_pack_data) self.assertEqual(local.refs[ref_name], new_refs[ref_name]) obj_local = local.get_object(new_refs[ref_name]) obj_target = target.get_object(new_refs[ref_name]) self.assertEqual(obj_local, obj_target) class HttpGitClientTests(TestCase): @staticmethod def b64encode(s): """Python 2/3 compatible Base64 encoder. Returns string.""" try: return base64.b64encode(s) except TypeError: return base64.b64encode(s.encode('latin1')).decode('ascii') def test_get_url(self): base_url = 'https://github.com/jelmer/dulwich' path = '/jelmer/dulwich' c = HttpGitClient(base_url) url = c.get_url(path) self.assertEqual('https://github.com/jelmer/dulwich', url) def test_get_url_bytes_path(self): base_url = 'https://github.com/jelmer/dulwich' path_bytes = b'/jelmer/dulwich' c = HttpGitClient(base_url) url = c.get_url(path_bytes) self.assertEqual('https://github.com/jelmer/dulwich', url) def test_get_url_with_username_and_passwd(self): base_url = 'https://github.com/jelmer/dulwich' path = '/jelmer/dulwich' c = HttpGitClient(base_url, username='USERNAME', password='PASSWD') url = c.get_url(path) self.assertEqual('https://github.com/jelmer/dulwich', url) def test_init_username_passwd_set(self): url = 'https://github.com/jelmer/dulwich' c = HttpGitClient(url, config=None, username='user', password='passwd') self.assertEqual('user', c._username) self.assertEqual('passwd', c._password) basic_auth = c.pool_manager.headers['authorization'] auth_string = '%s:%s' % ('user', 'passwd') b64_credentials = self.b64encode(auth_string) expected_basic_auth = 'Basic %s' % b64_credentials self.assertEqual(basic_auth, expected_basic_auth) def test_init_no_username_passwd(self): url = 'https://github.com/jelmer/dulwich' c = HttpGitClient(url, config=None) self.assertIs(None, c._username) self.assertIs(None, c._password) self.assertNotIn('authorization', c.pool_manager.headers) def test_from_parsedurl_on_url_with_quoted_credentials(self): original_username = 'john|the|first' quoted_username = urlquote(original_username) original_password = 'Ya#1$2%3' quoted_password = urlquote(original_password) url = 'https://{username}:{password}@github.com/jelmer/dulwich'.format( username=quoted_username, password=quoted_password ) c = HttpGitClient.from_parsedurl(urlparse.urlparse(url)) self.assertEqual(original_username, c._username) self.assertEqual(original_password, c._password) basic_auth = c.pool_manager.headers['authorization'] auth_string = '%s:%s' % (original_username, original_password) b64_credentials = self.b64encode(auth_string) expected_basic_auth = 'Basic %s' % str(b64_credentials) self.assertEqual(basic_auth, expected_basic_auth) class TCPGitClientTests(TestCase): def test_get_url(self): host = 'github.com' path = '/jelmer/dulwich' c = TCPGitClient(host) url = c.get_url(path) self.assertEqual('git://github.com/jelmer/dulwich', url) def test_get_url_with_port(self): host = 'github.com' path = '/jelmer/dulwich' port = 9090 c = TCPGitClient(host, port=port) url = c.get_url(path) self.assertEqual('git://github.com:9090/jelmer/dulwich', url) class DefaultUrllib3ManagerTest(TestCase): def test_no_config(self): manager = default_urllib3_manager(config=None) - pool_keywords = tuple(manager.connection_pool_kw.items()) self.assertEqual(manager.connection_pool_kw['cert_reqs'], 'CERT_REQUIRED') def test_config_no_proxy(self): manager = default_urllib3_manager(config=ConfigDict()) self.assertNotIsInstance(manager, urllib3.ProxyManager) def test_config_ssl(self): config = ConfigDict() config.set(b'http', b'sslVerify', b'true') manager = default_urllib3_manager(config=config) self.assertEqual(manager.connection_pool_kw['cert_reqs'], 'CERT_REQUIRED') def test_config_no_ssl(self): config = ConfigDict() config.set(b'http', b'sslVerify', b'false') manager = default_urllib3_manager(config=config) self.assertEqual(manager.connection_pool_kw['cert_reqs'], 'CERT_NONE') def test_config_proxy(self): config = ConfigDict() config.set(b'http', b'proxy', b'http://localhost:3128/') manager = default_urllib3_manager(config=config) self.assertIsInstance(manager, urllib3.ProxyManager) self.assertTrue(hasattr(manager, 'proxy')) self.assertEqual(manager.proxy.scheme, 'http') self.assertEqual(manager.proxy.host, 'localhost') self.assertEqual(manager.proxy.port, 3128) def test_config_no_verify_ssl(self): manager = default_urllib3_manager(config=None, cert_reqs="CERT_NONE") self.assertEqual(manager.connection_pool_kw['cert_reqs'], 'CERT_NONE') class SubprocessSSHVendorTests(TestCase): def setUp(self): # Monkey Patch client subprocess popen self._orig_popen = dulwich.client.subprocess.Popen dulwich.client.subprocess.Popen = DummyPopen def tearDown(self): dulwich.client.subprocess.Popen = self._orig_popen def test_run_command_dashes(self): vendor = SubprocessSSHVendor() self.assertRaises(StrangeHostname, vendor.run_command, '--weird-host', 'git-clone-url') def test_run_command_password(self): vendor = SubprocessSSHVendor() self.assertRaises(NotImplementedError, vendor.run_command, 'host', 'git-clone-url', password='12345') def test_run_command_password_and_privkey(self): vendor = SubprocessSSHVendor() self.assertRaises(NotImplementedError, vendor.run_command, 'host', 'git-clone-url', password='12345', key_filename='/tmp/id_rsa') def test_run_command_with_port_username_and_privkey(self): expected = ['ssh', '-x', '-p', '2200', '-i', '/tmp/id_rsa', 'user@host', 'git-clone-url'] vendor = SubprocessSSHVendor() command = vendor.run_command( 'host', 'git-clone-url', username='user', port='2200', key_filename='/tmp/id_rsa') args = command.proc.args self.assertListEqual(expected, args[0]) class PuttySSHVendorTests(TestCase): def setUp(self): # Monkey Patch client subprocess popen self._orig_popen = dulwich.client.subprocess.Popen dulwich.client.subprocess.Popen = DummyPopen def tearDown(self): dulwich.client.subprocess.Popen = self._orig_popen def test_run_command_dashes(self): vendor = PuttySSHVendor() self.assertRaises(StrangeHostname, vendor.run_command, '--weird-host', 'git-clone-url') def test_run_command_password_and_privkey(self): vendor = PuttySSHVendor() self.assertRaises(NotImplementedError, vendor.run_command, 'host', 'git-clone-url', password='12345', key_filename='/tmp/id_rsa') def test_run_command_password(self): if sys.platform == 'win32': binary = ['putty.exe', '-ssh'] else: binary = ['putty', '-ssh'] expected = binary + ['-pw', '12345', 'host', 'git-clone-url'] vendor = PuttySSHVendor() warnings.simplefilter("always", UserWarning) self.addCleanup(warnings.resetwarnings) warnings_list, restore_warnings = setup_warning_catcher() self.addCleanup(restore_warnings) command = vendor.run_command('host', 'git-clone-url', password='12345') expected_warning = UserWarning( 'Invoking Putty with a password exposes the password in the ' 'process list.') for w in warnings_list: if (type(w) == type(expected_warning) and w.args == expected_warning.args): break else: raise AssertionError( 'Expected warning %r not in %r' % (expected_warning, warnings_list)) args = command.proc.args self.assertListEqual(expected, args[0]) def test_run_command_with_port_username_and_privkey(self): if sys.platform == 'win32': binary = ['putty.exe', '-ssh'] else: binary = ['putty', '-ssh'] expected = binary + [ '-P', '2200', '-i', '/tmp/id_rsa', 'user@host', 'git-clone-url'] vendor = PuttySSHVendor() command = vendor.run_command( 'host', 'git-clone-url', username='user', port='2200', key_filename='/tmp/id_rsa') args = command.proc.args self.assertListEqual(expected, args[0])