diff --git a/dulwich/contrib/test_swift_smoke.py b/dulwich/contrib/test_swift_smoke.py index ac04a6bb..5cb540a9 100644 --- a/dulwich/contrib/test_swift_smoke.py +++ b/dulwich/contrib/test_swift_smoke.py @@ -1,313 +1,313 @@ # test_smoke.py -- Functional tests for the Swift backend. # Copyright (C) 2013 eNovance SAS # # Author: Fabien Boucher # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Start functional tests A Swift installation must be available before starting those tests. The account and authentication method used during this functional tests must be changed in the configuration file passed as environment variable. The container used to create a fake repository is defined in cls.fakerepo and will be deleted after the tests. DULWICH_SWIFT_CFG=/tmp/conf.cfg PYTHONPATH=. python -m unittest \ dulwich.tests_swift.test_smoke """ import os import unittest import tempfile import shutil import gevent from gevent import monkey monkey.patch_all() from dulwich import ( # noqa:E402 server, repo, index, client, objects, ) from dulwich.contrib import swift # noqa:E402 class DulwichServer: """Start the TCPGitServer with Swift backend""" def __init__(self, backend, port): self.port = port self.backend = backend def run(self): self.server = server.TCPGitServer(self.backend, "localhost", port=self.port) self.job = gevent.spawn(self.server.serve_forever) def stop(self): self.server.shutdown() gevent.joinall((self.job,)) class SwiftSystemBackend(server.Backend): def open_repository(self, path): return swift.SwiftRepo(path, conf=swift.load_conf()) class SwiftRepoSmokeTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.backend = SwiftSystemBackend() cls.port = 9148 cls.server_address = "localhost" cls.fakerepo = "fakerepo" cls.th_server = DulwichServer(cls.backend, cls.port) cls.th_server.run() cls.conf = swift.load_conf() @classmethod def tearDownClass(cls): cls.th_server.stop() def setUp(self): self.scon = swift.SwiftConnector(self.fakerepo, self.conf) if self.scon.test_root_exists(): try: self.scon.del_root() except swift.SwiftException: pass self.temp_d = tempfile.mkdtemp() if os.path.isdir(self.temp_d): shutil.rmtree(self.temp_d) def tearDown(self): if self.scon.test_root_exists(): try: self.scon.del_root() except swift.SwiftException: pass if os.path.isdir(self.temp_d): shutil.rmtree(self.temp_d) def test_init_bare(self): swift.SwiftRepo.init_bare(self.scon, self.conf) self.assertTrue(self.scon.test_root_exists()) obj = self.scon.get_container_objects() filtered = [ o for o in obj if o["name"] == "info/refs" or o["name"] == "objects/pack" ] self.assertEqual(len(filtered), 2) def test_clone_bare(self): local_repo = repo.Repo.init(self.temp_d, mkdir=True) swift.SwiftRepo.init_bare(self.scon, self.conf) tcp_client = client.TCPGitClient(self.server_address, port=self.port) remote_refs = tcp_client.fetch(self.fakerepo, local_repo) # The remote repo is empty (no refs retreived) self.assertEqual(remote_refs, None) def test_push_commit(self): - def determine_wants(*args): + def determine_wants(*args, **kwargs): return {"refs/heads/master": local_repo.refs["HEAD"]} local_repo = repo.Repo.init(self.temp_d, mkdir=True) # Nothing in the staging area local_repo.do_commit("Test commit", "fbo@localhost") sha = local_repo.refs.read_loose_ref("refs/heads/master") swift.SwiftRepo.init_bare(self.scon, self.conf) tcp_client = client.TCPGitClient(self.server_address, port=self.port) tcp_client.send_pack( self.fakerepo, determine_wants, local_repo.generate_pack_data ) swift_repo = swift.SwiftRepo("fakerepo", self.conf) remote_sha = swift_repo.refs.read_loose_ref("refs/heads/master") self.assertEqual(sha, remote_sha) def test_push_branch(self): - def determine_wants(*args): + def determine_wants(*args, **kwargs): return {"refs/heads/mybranch": local_repo.refs["refs/heads/mybranch"]} local_repo = repo.Repo.init(self.temp_d, mkdir=True) # Nothing in the staging area local_repo.do_commit("Test commit", "fbo@localhost", ref="refs/heads/mybranch") sha = local_repo.refs.read_loose_ref("refs/heads/mybranch") swift.SwiftRepo.init_bare(self.scon, self.conf) tcp_client = client.TCPGitClient(self.server_address, port=self.port) tcp_client.send_pack( "/fakerepo", determine_wants, local_repo.generate_pack_data ) swift_repo = swift.SwiftRepo(self.fakerepo, self.conf) remote_sha = swift_repo.refs.read_loose_ref("refs/heads/mybranch") self.assertEqual(sha, remote_sha) def test_push_multiple_branch(self): - def determine_wants(*args): + def determine_wants(*args, **kwargs): return { "refs/heads/mybranch": local_repo.refs["refs/heads/mybranch"], "refs/heads/master": local_repo.refs["refs/heads/master"], "refs/heads/pullr-108": local_repo.refs["refs/heads/pullr-108"], } local_repo = repo.Repo.init(self.temp_d, mkdir=True) # Nothing in the staging area local_shas = {} remote_shas = {} for branch in ("master", "mybranch", "pullr-108"): local_shas[branch] = local_repo.do_commit( "Test commit %s" % branch, "fbo@localhost", ref="refs/heads/%s" % branch, ) swift.SwiftRepo.init_bare(self.scon, self.conf) tcp_client = client.TCPGitClient(self.server_address, port=self.port) tcp_client.send_pack( self.fakerepo, determine_wants, local_repo.generate_pack_data ) swift_repo = swift.SwiftRepo("fakerepo", self.conf) for branch in ("master", "mybranch", "pullr-108"): remote_shas[branch] = swift_repo.refs.read_loose_ref( "refs/heads/%s" % branch ) self.assertDictEqual(local_shas, remote_shas) def test_push_data_branch(self): - def determine_wants(*args): + def determine_wants(*args, **kwargs): return {"refs/heads/master": local_repo.refs["HEAD"]} local_repo = repo.Repo.init(self.temp_d, mkdir=True) os.mkdir(os.path.join(self.temp_d, "dir")) files = ("testfile", "testfile2", "dir/testfile3") i = 0 for f in files: open(os.path.join(self.temp_d, f), "w").write("DATA %s" % i) i += 1 local_repo.stage(files) local_repo.do_commit("Test commit", "fbo@localhost", ref="refs/heads/master") swift.SwiftRepo.init_bare(self.scon, self.conf) tcp_client = client.TCPGitClient(self.server_address, port=self.port) tcp_client.send_pack( self.fakerepo, determine_wants, local_repo.generate_pack_data ) swift_repo = swift.SwiftRepo("fakerepo", self.conf) commit_sha = swift_repo.refs.read_loose_ref("refs/heads/master") otype, data = swift_repo.object_store.get_raw(commit_sha) commit = objects.ShaFile.from_raw_string(otype, data) otype, data = swift_repo.object_store.get_raw(commit._tree) tree = objects.ShaFile.from_raw_string(otype, data) objs = tree.items() objs_ = [] for tree_entry in objs: objs_.append(swift_repo.object_store.get_raw(tree_entry.sha)) # Blob self.assertEqual(objs_[1][1], "DATA 0") self.assertEqual(objs_[2][1], "DATA 1") # Tree self.assertEqual(objs_[0][0], 2) def test_clone_then_push_data(self): self.test_push_data_branch() shutil.rmtree(self.temp_d) local_repo = repo.Repo.init(self.temp_d, mkdir=True) tcp_client = client.TCPGitClient(self.server_address, port=self.port) remote_refs = tcp_client.fetch(self.fakerepo, local_repo) files = ( os.path.join(self.temp_d, "testfile"), os.path.join(self.temp_d, "testfile2"), ) local_repo["HEAD"] = remote_refs["refs/heads/master"] indexfile = local_repo.index_path() tree = local_repo["HEAD"].tree index.build_index_from_tree( local_repo.path, indexfile, local_repo.object_store, tree ) for f in files: self.assertEqual(os.path.isfile(f), True) - def determine_wants(*args): + def determine_wants(*args, **kwargs): return {"refs/heads/master": local_repo.refs["HEAD"]} os.mkdir(os.path.join(self.temp_d, "test")) files = ("testfile11", "testfile22", "test/testfile33") i = 0 for f in files: open(os.path.join(self.temp_d, f), "w").write("DATA %s" % i) i += 1 local_repo.stage(files) local_repo.do_commit("Test commit", "fbo@localhost", ref="refs/heads/master") tcp_client.send_pack( "/fakerepo", determine_wants, local_repo.generate_pack_data ) def test_push_remove_branch(self): - def determine_wants(*args): + def determine_wants(*args, **kwargs): return { "refs/heads/pullr-108": objects.ZERO_SHA, "refs/heads/master": local_repo.refs["refs/heads/master"], "refs/heads/mybranch": local_repo.refs["refs/heads/mybranch"], } self.test_push_multiple_branch() local_repo = repo.Repo(self.temp_d) tcp_client = client.TCPGitClient(self.server_address, port=self.port) tcp_client.send_pack( self.fakerepo, determine_wants, local_repo.generate_pack_data ) swift_repo = swift.SwiftRepo("fakerepo", self.conf) self.assertNotIn("refs/heads/pullr-108", swift_repo.refs.allkeys()) def test_push_annotated_tag(self): - def determine_wants(*args): + def determine_wants(*args, **kwargs): return { "refs/heads/master": local_repo.refs["HEAD"], "refs/tags/v1.0": local_repo.refs["refs/tags/v1.0"], } local_repo = repo.Repo.init(self.temp_d, mkdir=True) # Nothing in the staging area sha = local_repo.do_commit("Test commit", "fbo@localhost") otype, data = local_repo.object_store.get_raw(sha) commit = objects.ShaFile.from_raw_string(otype, data) tag = objects.Tag() tag.tagger = "fbo@localhost" tag.message = "Annotated tag" tag.tag_timezone = objects.parse_timezone("-0200")[0] tag.tag_time = commit.author_time tag.object = (objects.Commit, commit.id) tag.name = "v0.1" local_repo.object_store.add_object(tag) local_repo.refs["refs/tags/v1.0"] = tag.id swift.SwiftRepo.init_bare(self.scon, self.conf) tcp_client = client.TCPGitClient(self.server_address, port=self.port) tcp_client.send_pack( self.fakerepo, determine_wants, local_repo.generate_pack_data ) swift_repo = swift.SwiftRepo(self.fakerepo, self.conf) tag_sha = swift_repo.refs.read_loose_ref("refs/tags/v1.0") otype, data = swift_repo.object_store.get_raw(tag_sha) rtag = objects.ShaFile.from_raw_string(otype, data) self.assertEqual(rtag.object[1], commit.id) self.assertEqual(rtag.id, tag.id) if __name__ == "__main__": unittest.main() diff --git a/dulwich/tests/compat/test_client.py b/dulwich/tests/compat/test_client.py index 1cb3cc99..965f5c54 100644 --- a/dulwich/tests/compat/test_client.py +++ b/dulwich/tests/compat/test_client.py @@ -1,661 +1,661 @@ # test_client.py -- Compatibilty tests for git client. # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Compatibilty tests between the Dulwich client and the cgit server.""" import copy from io import BytesIO import os import select import signal import stat import subprocess import sys import tarfile import tempfile import threading from urllib.parse import unquote import http.server from dulwich import ( client, file, index, protocol, objects, repo, ) from dulwich.tests import ( SkipTest, expectedFailure, ) from dulwich.tests.compat.utils import ( CompatTestCase, check_for_daemon, import_repo_to_dir, rmtree_ro, run_git_or_fail, _DEFAULT_GIT, ) if sys.platform == "win32": import ctypes class DulwichClientTestBase(object): """Tests for client/server compatibility.""" def setUp(self): self.gitroot = os.path.dirname( import_repo_to_dir("server_new.export").rstrip(os.sep) ) self.dest = os.path.join(self.gitroot, "dest") file.ensure_dir_exists(self.dest) run_git_or_fail(["init", "--quiet", "--bare"], cwd=self.dest) def tearDown(self): rmtree_ro(self.gitroot) def assertDestEqualsSrc(self): repo_dir = os.path.join(self.gitroot, "server_new.export") dest_repo_dir = os.path.join(self.gitroot, "dest") with repo.Repo(repo_dir) as src: with repo.Repo(dest_repo_dir) as dest: self.assertReposEqual(src, dest) def _client(self): raise NotImplementedError() def _build_path(self): raise NotImplementedError() def _do_send_pack(self): c = self._client() srcpath = os.path.join(self.gitroot, "server_new.export") with repo.Repo(srcpath) as src: sendrefs = dict(src.get_refs()) del sendrefs[b"HEAD"] c.send_pack( self._build_path("/dest"), lambda _: sendrefs, src.generate_pack_data, ) def test_send_pack(self): self._do_send_pack() self.assertDestEqualsSrc() def test_send_pack_nothing_to_send(self): self._do_send_pack() self.assertDestEqualsSrc() # nothing to send, but shouldn't raise either. self._do_send_pack() @staticmethod def _add_file(repo, tree_id, filename, contents): tree = repo[tree_id] blob = objects.Blob() blob.data = contents.encode("utf-8") repo.object_store.add_object(blob) tree.add(filename.encode("utf-8"), stat.S_IFREG | 0o644, blob.id) repo.object_store.add_object(tree) return tree.id def test_send_pack_from_shallow_clone(self): c = self._client() server_new_path = os.path.join(self.gitroot, "server_new.export") run_git_or_fail(["config", "http.uploadpack", "true"], cwd=server_new_path) run_git_or_fail(["config", "http.receivepack", "true"], cwd=server_new_path) remote_path = self._build_path("/server_new.export") with repo.Repo(self.dest) as local: result = c.fetch(remote_path, local, depth=1) for r in result.refs.items(): local.refs.set_if_equals(r[0], None, r[1]) tree_id = local[local.head()].tree for filename, contents in [ ("bar", "bar contents"), ("zop", "zop contents"), ]: tree_id = self._add_file(local, tree_id, filename, contents) commit_id = local.do_commit( message=b"add " + filename.encode("utf-8"), committer=b"Joe Example ", tree=tree_id, ) sendrefs = dict(local.get_refs()) del sendrefs[b"HEAD"] c.send_pack(remote_path, lambda _: sendrefs, local.generate_pack_data) with repo.Repo(server_new_path) as remote: self.assertEqual(remote.head(), commit_id) def test_send_without_report_status(self): c = self._client() c._send_capabilities.remove(b"report-status") srcpath = os.path.join(self.gitroot, "server_new.export") with repo.Repo(srcpath) as src: sendrefs = dict(src.get_refs()) del sendrefs[b"HEAD"] c.send_pack( self._build_path("/dest"), lambda _: sendrefs, src.generate_pack_data, ) self.assertDestEqualsSrc() def make_dummy_commit(self, dest): b = objects.Blob.from_string(b"hi") dest.object_store.add_object(b) t = index.commit_tree(dest.object_store, [(b"hi", b.id, 0o100644)]) c = objects.Commit() c.author = c.committer = b"Foo Bar " c.author_time = c.commit_time = 0 c.author_timezone = c.commit_timezone = 0 c.message = b"hi" c.tree = t dest.object_store.add_object(c) return c.id def disable_ff_and_make_dummy_commit(self): # disable non-fast-forward pushes to the server dest = repo.Repo(os.path.join(self.gitroot, "dest")) run_git_or_fail( ["config", "receive.denyNonFastForwards", "true"], cwd=dest.path ) commit_id = self.make_dummy_commit(dest) return dest, commit_id def compute_send(self, src): sendrefs = dict(src.get_refs()) del sendrefs[b"HEAD"] return sendrefs, src.generate_pack_data def test_send_pack_one_error(self): dest, dummy_commit = self.disable_ff_and_make_dummy_commit() dest.refs[b"refs/heads/master"] = dummy_commit repo_dir = os.path.join(self.gitroot, "server_new.export") with repo.Repo(repo_dir) as src: sendrefs, gen_pack = self.compute_send(src) c = self._client() result = c.send_pack( self._build_path("/dest"), lambda _: sendrefs, gen_pack ) self.assertEqual( { b"refs/heads/branch": None, b"refs/heads/master": "non-fast-forward", }, result.ref_status, ) def test_send_pack_multiple_errors(self): dest, dummy = self.disable_ff_and_make_dummy_commit() # set up for two non-ff errors branch, master = b"refs/heads/branch", b"refs/heads/master" dest.refs[branch] = dest.refs[master] = dummy repo_dir = os.path.join(self.gitroot, "server_new.export") with repo.Repo(repo_dir) as src: sendrefs, gen_pack = self.compute_send(src) c = self._client() result = c.send_pack( self._build_path("/dest"), lambda _: sendrefs, gen_pack ) self.assertEqual( {branch: "non-fast-forward", master: "non-fast-forward"}, result.ref_status, ) def test_archive(self): c = self._client() f = BytesIO() c.archive(self._build_path("/server_new.export"), b"HEAD", f.write) f.seek(0) tf = tarfile.open(fileobj=f) self.assertEqual(["baz", "foo"], tf.getnames()) def test_fetch_pack(self): c = self._client() with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: result = c.fetch(self._build_path("/server_new.export"), dest) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_fetch_pack_depth(self): c = self._client() with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: result = c.fetch(self._build_path("/server_new.export"), dest, depth=1) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertEqual( dest.get_shallow(), set( [ b"35e0b59e187dd72a0af294aedffc213eaa4d03ff", b"514dc6d3fbfe77361bcaef320c4d21b72bc10be9", ] ), ) def test_repeat(self): c = self._client() with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: result = c.fetch(self._build_path("/server_new.export"), dest) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() result = c.fetch(self._build_path("/server_new.export"), dest) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_fetch_empty_pack(self): c = self._client() with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: result = c.fetch(self._build_path("/server_new.export"), dest) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() - def dw(refs): + def dw(refs, **kwargs): return list(refs.values()) result = c.fetch( self._build_path("/server_new.export"), dest, determine_wants=dw, ) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_incremental_fetch_pack(self): self.test_fetch_pack() dest, dummy = self.disable_ff_and_make_dummy_commit() dest.refs[b"refs/heads/master"] = dummy c = self._client() repo_dir = os.path.join(self.gitroot, "server_new.export") with repo.Repo(repo_dir) as dest: result = c.fetch(self._build_path("/dest"), dest) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_fetch_pack_no_side_band_64k(self): c = self._client() c._fetch_capabilities.remove(b"side-band-64k") with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: result = c.fetch(self._build_path("/server_new.export"), dest) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) self.assertDestEqualsSrc() def test_fetch_pack_zero_sha(self): # zero sha1s are already present on the client, and should # be ignored c = self._client() with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: result = c.fetch( self._build_path("/server_new.export"), dest, - lambda refs: [protocol.ZERO_SHA], + lambda refs, **kwargs: [protocol.ZERO_SHA], ) for r in result.refs.items(): dest.refs.set_if_equals(r[0], None, r[1]) def test_send_remove_branch(self): with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: dummy_commit = self.make_dummy_commit(dest) dest.refs[b"refs/heads/master"] = dummy_commit dest.refs[b"refs/heads/abranch"] = dummy_commit sendrefs = dict(dest.refs) sendrefs[b"refs/heads/abranch"] = b"00" * 20 del sendrefs[b"HEAD"] def gen_pack(have, want, ofs_delta=False): return 0, [] c = self._client() self.assertEqual(dest.refs[b"refs/heads/abranch"], dummy_commit) c.send_pack(self._build_path("/dest"), lambda _: sendrefs, gen_pack) self.assertFalse(b"refs/heads/abranch" in dest.refs) def test_send_new_branch_empty_pack(self): with repo.Repo(os.path.join(self.gitroot, "dest")) as dest: dummy_commit = self.make_dummy_commit(dest) dest.refs[b"refs/heads/master"] = dummy_commit dest.refs[b"refs/heads/abranch"] = dummy_commit sendrefs = {b"refs/heads/bbranch": dummy_commit} def gen_pack(have, want, ofs_delta=False): return 0, [] c = self._client() self.assertEqual(dest.refs[b"refs/heads/abranch"], dummy_commit) c.send_pack(self._build_path("/dest"), lambda _: sendrefs, gen_pack) self.assertEqual(dummy_commit, dest.refs[b"refs/heads/abranch"]) def test_get_refs(self): c = self._client() refs = c.get_refs(self._build_path("/server_new.export")) repo_dir = os.path.join(self.gitroot, "server_new.export") with repo.Repo(repo_dir) as dest: self.assertDictEqual(dest.refs.as_dict(), refs) class DulwichTCPClientTest(CompatTestCase, DulwichClientTestBase): def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) if check_for_daemon(limit=1): raise SkipTest( "git-daemon was already running on port %s" % protocol.TCP_GIT_PORT ) fd, self.pidfile = tempfile.mkstemp( prefix="dulwich-test-git-client", suffix=".pid" ) os.fdopen(fd).close() args = [ _DEFAULT_GIT, "daemon", "--verbose", "--export-all", "--pid-file=%s" % self.pidfile, "--base-path=%s" % self.gitroot, "--enable=receive-pack", "--enable=upload-archive", "--listen=localhost", "--reuseaddr", self.gitroot, ] self.process = subprocess.Popen( args, cwd=self.gitroot, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if not check_for_daemon(): raise SkipTest("git-daemon failed to start") def tearDown(self): with open(self.pidfile) as f: pid = int(f.read().strip()) if sys.platform == "win32": PROCESS_TERMINATE = 1 handle = ctypes.windll.kernel32.OpenProcess(PROCESS_TERMINATE, False, pid) ctypes.windll.kernel32.TerminateProcess(handle, -1) ctypes.windll.kernel32.CloseHandle(handle) else: try: os.kill(pid, signal.SIGKILL) os.unlink(self.pidfile) except (OSError, IOError): pass self.process.wait() self.process.stdout.close() self.process.stderr.close() DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) def _client(self): return client.TCPGitClient("localhost") def _build_path(self, path): return path if sys.platform == "win32": @expectedFailure def test_fetch_pack_no_side_band_64k(self): DulwichClientTestBase.test_fetch_pack_no_side_band_64k(self) class TestSSHVendor(object): @staticmethod def run_command( host, command, username=None, port=None, password=None, key_filename=None, ): cmd, path = command.split(" ") cmd = cmd.split("-", 1) path = path.replace("'", "") p = subprocess.Popen( cmd + [path], bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) return client.SubprocessWrapper(p) class DulwichMockSSHClientTest(CompatTestCase, DulwichClientTestBase): def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) self.real_vendor = client.get_ssh_vendor client.get_ssh_vendor = TestSSHVendor def tearDown(self): DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) client.get_ssh_vendor = self.real_vendor def _client(self): return client.SSHGitClient("localhost") def _build_path(self, path): return self.gitroot + path class DulwichSubprocessClientTest(CompatTestCase, DulwichClientTestBase): def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) def tearDown(self): DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) def _client(self): return client.SubprocessGitClient() def _build_path(self, path): return self.gitroot + path class GitHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): """HTTP Request handler that calls out to 'git http-backend'.""" # Make rfile unbuffered -- we need to read one line and then pass # the rest to a subprocess, so we can't use buffered input. rbufsize = 0 def do_POST(self): self.run_backend() def do_GET(self): self.run_backend() def send_head(self): return self.run_backend() def log_request(self, code="-", size="-"): # Let's be quiet, the test suite is noisy enough already pass def run_backend(self): # noqa: C901 """Call out to git http-backend.""" # Based on CGIHTTPServer.CGIHTTPRequestHandler.run_cgi: # Copyright (c) 2001-2010 Python Software Foundation; # All Rights Reserved # Licensed under the Python Software Foundation License. rest = self.path # find an explicit query string, if present. i = rest.rfind("?") if i >= 0: rest, query = rest[:i], rest[i + 1 :] else: query = "" env = copy.deepcopy(os.environ) env["SERVER_SOFTWARE"] = self.version_string() env["SERVER_NAME"] = self.server.server_name env["GATEWAY_INTERFACE"] = "CGI/1.1" env["SERVER_PROTOCOL"] = self.protocol_version env["SERVER_PORT"] = str(self.server.server_port) env["GIT_PROJECT_ROOT"] = self.server.root_path env["GIT_HTTP_EXPORT_ALL"] = "1" env["REQUEST_METHOD"] = self.command uqrest = unquote(rest) env["PATH_INFO"] = uqrest env["SCRIPT_NAME"] = "/" if query: env["QUERY_STRING"] = query host = self.address_string() if host != self.client_address[0]: env["REMOTE_HOST"] = host env["REMOTE_ADDR"] = self.client_address[0] authorization = self.headers.get("authorization") if authorization: authorization = authorization.split() if len(authorization) == 2: import base64 import binascii env["AUTH_TYPE"] = authorization[0] if authorization[0].lower() == "basic": try: authorization = base64.decodestring(authorization[1]) except binascii.Error: pass else: authorization = authorization.split(":") if len(authorization) == 2: env["REMOTE_USER"] = authorization[0] # XXX REMOTE_IDENT content_type = self.headers.get("content-type") if content_type: env["CONTENT_TYPE"] = content_type length = self.headers.get("content-length") if length: env["CONTENT_LENGTH"] = length referer = self.headers.get("referer") if referer: env["HTTP_REFERER"] = referer accept = [] for line in self.headers.getallmatchingheaders("accept"): if line[:1] in "\t\n\r ": accept.append(line.strip()) else: accept = accept + line[7:].split(",") env["HTTP_ACCEPT"] = ",".join(accept) ua = self.headers.get("user-agent") if ua: env["HTTP_USER_AGENT"] = ua co = self.headers.get("cookie") if co: env["HTTP_COOKIE"] = co # XXX Other HTTP_* headers # Since we're setting the env in the parent, provide empty # values to override previously set values for k in ( "QUERY_STRING", "REMOTE_HOST", "CONTENT_LENGTH", "HTTP_USER_AGENT", "HTTP_COOKIE", "HTTP_REFERER", ): env.setdefault(k, "") self.wfile.write(b"HTTP/1.1 200 Script output follows\r\n") self.wfile.write(("Server: %s\r\n" % self.server.server_name).encode("ascii")) self.wfile.write(("Date: %s\r\n" % self.date_time_string()).encode("ascii")) decoded_query = query.replace("+", " ") try: nbytes = int(length) except (TypeError, ValueError): nbytes = 0 if self.command.lower() == "post" and nbytes > 0: data = self.rfile.read(nbytes) else: data = None env["CONTENT_LENGTH"] = "0" # throw away additional data [see bug #427345] while select.select([self.rfile._sock], [], [], 0)[0]: if not self.rfile._sock.recv(1): break args = ["http-backend"] if "=" not in decoded_query: args.append(decoded_query) stdout = run_git_or_fail(args, input=data, env=env, stderr=subprocess.PIPE) self.wfile.write(stdout) class HTTPGitServer(http.server.HTTPServer): allow_reuse_address = True def __init__(self, server_address, root_path): http.server.HTTPServer.__init__(self, server_address, GitHTTPRequestHandler) self.root_path = root_path self.server_name = "localhost" def get_url(self): return "http://%s:%s/" % (self.server_name, self.server_port) class DulwichHttpClientTest(CompatTestCase, DulwichClientTestBase): min_git_version = (1, 7, 0, 2) def setUp(self): CompatTestCase.setUp(self) DulwichClientTestBase.setUp(self) self._httpd = HTTPGitServer(("localhost", 0), self.gitroot) self.addCleanup(self._httpd.shutdown) threading.Thread(target=self._httpd.serve_forever).start() run_git_or_fail(["config", "http.uploadpack", "true"], cwd=self.dest) run_git_or_fail(["config", "http.receivepack", "true"], cwd=self.dest) def tearDown(self): DulwichClientTestBase.tearDown(self) CompatTestCase.tearDown(self) self._httpd.shutdown() self._httpd.socket.close() def _client(self): return client.HttpGitClient(self._httpd.get_url()) def _build_path(self, path): return path def test_archive(self): raise SkipTest("exporting archives not supported over http") diff --git a/dulwich/tests/test_client.py b/dulwich/tests/test_client.py index b844f5e8..84f8fa6c 100644 --- a/dulwich/tests/test_client.py +++ b/dulwich/tests/test_client.py @@ -1,1489 +1,1489 @@ # test_client.py -- Tests for the git protocol, client side # Copyright (C) 2009 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # from io import BytesIO import base64 import os import sys import shutil import tempfile import warnings from urllib.parse import ( quote as urlquote, urlparse, ) import dulwich from dulwich import ( client, ) from dulwich.client import ( InvalidWants, LocalGitClient, TraditionalGitClient, TCPGitClient, SSHGitClient, HttpGitClient, FetchPackResult, ReportStatusParser, SendPackError, StrangeHostname, SubprocessSSHVendor, PLinkSSHVendor, HangupException, GitProtocolError, check_wants, default_urllib3_manager, get_credentials_from_store, get_transport_and_path, get_transport_and_path_from_url, parse_rsync_url, _remote_error_from_stderr, ) from dulwich.config import ( ConfigDict, ) from dulwich.tests import ( TestCase, ) from dulwich.protocol import ( TCP_GIT_PORT, Protocol, ) from dulwich.pack import ( pack_objects_to_data, write_pack_data, write_pack_objects, ) from dulwich.objects import Commit, Tree from dulwich.repo import ( MemoryRepo, Repo, ) from dulwich.tests import skipIf from dulwich.tests.utils import ( open_repo, tear_down_repo, setup_warning_catcher, ) class DummyClient(TraditionalGitClient): def __init__(self, can_read, read, write): self.can_read = can_read self.read = read self.write = write TraditionalGitClient.__init__(self) def _connect(self, service, path): return Protocol(self.read, self.write), self.can_read, None class DummyPopen: def __init__(self, *args, **kwards): self.stdin = BytesIO(b"stdin") self.stdout = BytesIO(b"stdout") self.stderr = BytesIO(b"stderr") self.returncode = 0 self.args = args self.kwargs = kwards def communicate(self, *args, **kwards): return ("Running", "") def wait(self, *args, **kwards): return False # TODO(durin42): add unit-level tests of GitClient class GitClientTests(TestCase): def setUp(self): super(GitClientTests, self).setUp() self.rout = BytesIO() self.rin = BytesIO() self.client = DummyClient(lambda x: True, self.rin.read, self.rout.write) def test_caps(self): agent_cap = ("agent=dulwich/%d.%d.%d" % dulwich.__version__).encode("ascii") self.assertEqual( set( [ b"multi_ack", b"side-band-64k", b"ofs-delta", b"thin-pack", b"multi_ack_detailed", b"shallow", agent_cap, ] ), set(self.client._fetch_capabilities), ) self.assertEqual( set( [ b"delete-refs", b"ofs-delta", b"report-status", b"side-band-64k", agent_cap, ] ), set(self.client._send_capabilities), ) def test_archive_ack(self): self.rin.write(b"0009NACK\n" b"0000") self.rin.seek(0) self.client.archive(b"bla", b"HEAD", None, None) self.assertEqual(self.rout.getvalue(), b"0011argument HEAD0000") def test_fetch_empty(self): self.rin.write(b"0000") self.rin.seek(0) - def check_heads(heads): + def check_heads(heads, **kwargs): self.assertEqual(heads, {}) return [] ret = self.client.fetch_pack(b"/", check_heads, None, None) self.assertEqual({}, ret.refs) self.assertEqual({}, ret.symrefs) def test_fetch_pack_ignores_magic_ref(self): self.rin.write( b"00000000000000000000000000000000000000000000 capabilities^{}" b"\x00 multi_ack " b"thin-pack side-band side-band-64k ofs-delta shallow no-progress " b"include-tag\n" b"0000" ) self.rin.seek(0) - def check_heads(heads): + def check_heads(heads, **kwargs): self.assertEqual({}, heads) return [] ret = self.client.fetch_pack(b"bla", check_heads, None, None, None) self.assertEqual({}, ret.refs) self.assertEqual({}, ret.symrefs) self.assertEqual(self.rout.getvalue(), b"0000") def test_fetch_pack_none(self): self.rin.write( b"008855dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 HEAD\x00multi_ack " b"thin-pack side-band side-band-64k ofs-delta shallow no-progress " b"include-tag\n" b"0000" ) self.rin.seek(0) - ret = self.client.fetch_pack(b"bla", lambda heads: [], None, None, None) + ret = self.client.fetch_pack(b"bla", lambda heads, **kwargs: [], None, None, None) self.assertEqual( {b"HEAD": b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7"}, ret.refs ) self.assertEqual({}, ret.symrefs) self.assertEqual(self.rout.getvalue(), b"0000") def test_send_pack_no_sideband64k_with_update_ref_error(self): # No side-bank-64k reported by server shouldn't try to parse # side band data pkts = [ b"55dcc6bf963f922e1ed5c4bbaaefcfacef57b1d7 capabilities^{}" b"\x00 report-status delete-refs ofs-delta\n", b"", b"unpack ok", b"ng refs/foo/bar pre-receive hook declined", b"", ] for pkt in pkts: if pkt == b"": self.rin.write(b"0000") else: self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt) self.rin.seek(0) tree = Tree() commit = Commit() commit.tree = tree commit.parents = [] commit.author = commit.committer = b"test user" commit.commit_time = commit.author_time = 1174773719 commit.commit_timezone = commit.author_timezone = 0 commit.encoding = b"UTF-8" commit.message = b"test message" def update_refs(refs): return { b"refs/foo/bar": commit.id, } def generate_pack_data(have, want, ofs_delta=False): return pack_objects_to_data( [ (commit, None), (tree, ""), ] ) result = self.client.send_pack("blah", update_refs, generate_pack_data) self.assertEqual( {b"refs/foo/bar": "pre-receive hook declined"}, result.ref_status ) self.assertEqual({b"refs/foo/bar": commit.id}, result.refs) def test_send_pack_none(self): # Set ref to current value self.rin.write( b"0078310ca9477129b8586fa2afc779c1f57cf64bba6c " b"refs/heads/master\x00 report-status delete-refs " b"side-band-64k quiet ofs-delta\n" b"0000" ) self.rin.seek(0) def update_refs(refs): return {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"} def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.client.send_pack(b"/", update_refs, generate_pack_data) self.assertEqual(self.rout.getvalue(), b"0000") def test_send_pack_keep_and_delete(self): self.rin.write( b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c " b"refs/heads/master\x00report-status delete-refs ofs-delta\n" b"003f310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/keepme\n" b"0000000eunpack ok\n" b"0019ok refs/heads/master\n" b"0000" ) self.rin.seek(0) def update_refs(refs): return {b"refs/heads/master": b"0" * 40} def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.client.send_pack(b"/", update_refs, generate_pack_data) self.assertEqual( self.rout.getvalue(), b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c " b"0000000000000000000000000000000000000000 " b"refs/heads/master\x00delete-refs ofs-delta report-status0000", ) def test_send_pack_delete_only(self): self.rin.write( b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c " b"refs/heads/master\x00report-status delete-refs ofs-delta\n" b"0000000eunpack ok\n" b"0019ok refs/heads/master\n" b"0000" ) self.rin.seek(0) def update_refs(refs): return {b"refs/heads/master": b"0" * 40} def generate_pack_data(have, want, ofs_delta=False): return 0, [] self.client.send_pack(b"/", update_refs, generate_pack_data) self.assertEqual( self.rout.getvalue(), b"008b310ca9477129b8586fa2afc779c1f57cf64bba6c " b"0000000000000000000000000000000000000000 " b"refs/heads/master\x00delete-refs ofs-delta report-status0000", ) def test_send_pack_new_ref_only(self): self.rin.write( b"0063310ca9477129b8586fa2afc779c1f57cf64bba6c " b"refs/heads/master\x00report-status delete-refs ofs-delta\n" b"0000000eunpack ok\n" b"0019ok refs/heads/blah12\n" b"0000" ) self.rin.seek(0) def update_refs(refs): return { b"refs/heads/blah12": b"310ca9477129b8586fa2afc779c1f57cf64bba6c", b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c", } def generate_pack_data(have, want, ofs_delta=False): return 0, [] f = BytesIO() write_pack_objects(f, {}) self.client.send_pack("/", update_refs, generate_pack_data) self.assertEqual( self.rout.getvalue(), b"008b0000000000000000000000000000000000000000 " b"310ca9477129b8586fa2afc779c1f57cf64bba6c " b"refs/heads/blah12\x00delete-refs ofs-delta report-status0000" + f.getvalue(), ) def test_send_pack_new_ref(self): self.rin.write( b"0064310ca9477129b8586fa2afc779c1f57cf64bba6c " b"refs/heads/master\x00 report-status delete-refs ofs-delta\n" b"0000000eunpack ok\n" b"0019ok refs/heads/blah12\n" b"0000" ) self.rin.seek(0) tree = Tree() commit = Commit() commit.tree = tree commit.parents = [] commit.author = commit.committer = b"test user" commit.commit_time = commit.author_time = 1174773719 commit.commit_timezone = commit.author_timezone = 0 commit.encoding = b"UTF-8" commit.message = b"test message" def update_refs(refs): return { b"refs/heads/blah12": commit.id, b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c", } def generate_pack_data(have, want, ofs_delta=False): return pack_objects_to_data( [ (commit, None), (tree, b""), ] ) f = BytesIO() write_pack_data(f, *generate_pack_data(None, None)) self.client.send_pack(b"/", update_refs, generate_pack_data) self.assertEqual( self.rout.getvalue(), b"008b0000000000000000000000000000000000000000 " + commit.id + b" refs/heads/blah12\x00delete-refs ofs-delta report-status0000" + f.getvalue(), ) def test_send_pack_no_deleteref_delete_only(self): pkts = [ b"310ca9477129b8586fa2afc779c1f57cf64bba6c refs/heads/master" b"\x00 report-status ofs-delta\n", b"", b"", ] for pkt in pkts: if pkt == b"": self.rin.write(b"0000") else: self.rin.write(("%04x" % (len(pkt) + 4)).encode("ascii") + pkt) self.rin.seek(0) def update_refs(refs): return {b"refs/heads/master": b"0" * 40} def generate_pack_data(have, want, ofs_delta=False): return 0, [] result = self.client.send_pack(b"/", update_refs, generate_pack_data) self.assertEqual( result.ref_status, {b"refs/heads/master": "remote does not support deleting refs"}, ) self.assertEqual( result.refs, {b"refs/heads/master": b"310ca9477129b8586fa2afc779c1f57cf64bba6c"}, ) self.assertEqual(self.rout.getvalue(), b"0000") class TestGetTransportAndPath(TestCase): def test_tcp(self): c, path = get_transport_and_path("git://foo.com/bar/baz") self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual("foo.com", c._host) self.assertEqual(TCP_GIT_PORT, c._port) self.assertEqual("/bar/baz", path) def test_tcp_port(self): c, path = get_transport_and_path("git://foo.com:1234/bar/baz") self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual("foo.com", c._host) self.assertEqual(1234, c._port) self.assertEqual("/bar/baz", path) def test_git_ssh_explicit(self): c, path = get_transport_and_path("git+ssh://foo.com/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("/bar/baz", path) def test_ssh_explicit(self): c, path = get_transport_and_path("ssh://foo.com/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("/bar/baz", path) def test_ssh_port_explicit(self): c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(1234, c.port) self.assertEqual("/bar/baz", path) def test_username_and_port_explicit_unknown_scheme(self): c, path = get_transport_and_path("unknown://git@server:7999/dply/stuff.git") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("unknown", c.host) self.assertEqual("//git@server:7999/dply/stuff.git", path) def test_username_and_port_explicit(self): c, path = get_transport_and_path("ssh://git@server:7999/dply/stuff.git") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("git", c.username) self.assertEqual("server", c.host) self.assertEqual(7999, c.port) self.assertEqual("/dply/stuff.git", path) def test_ssh_abspath_doubleslash(self): c, path = get_transport_and_path("git+ssh://foo.com//bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("//bar/baz", path) def test_ssh_port(self): c, path = get_transport_and_path("git+ssh://foo.com:1234/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(1234, c.port) self.assertEqual("/bar/baz", path) def test_ssh_implicit(self): c, path = get_transport_and_path("foo:/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("/bar/baz", path) def test_ssh_host(self): c, path = get_transport_and_path("foo.com:/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("/bar/baz", path) def test_ssh_user_host(self): c, path = get_transport_and_path("user@foo.com:/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual("user", c.username) self.assertEqual("/bar/baz", path) def test_ssh_relpath(self): c, path = get_transport_and_path("foo:bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("bar/baz", path) def test_ssh_host_relpath(self): c, path = get_transport_and_path("foo.com:bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("bar/baz", path) def test_ssh_user_host_relpath(self): c, path = get_transport_and_path("user@foo.com:bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual("user", c.username) self.assertEqual("bar/baz", path) def test_local(self): c, path = get_transport_and_path("foo.bar/baz") self.assertTrue(isinstance(c, LocalGitClient)) self.assertEqual("foo.bar/baz", path) @skipIf(sys.platform != "win32", "Behaviour only happens on windows.") def test_local_abs_windows_path(self): c, path = get_transport_and_path("C:\\foo.bar\\baz") self.assertTrue(isinstance(c, LocalGitClient)) self.assertEqual("C:\\foo.bar\\baz", path) def test_error(self): # Need to use a known urlparse.uses_netloc URL scheme to get the # expected parsing of the URL on Python versions less than 2.6.5 c, path = get_transport_and_path("prospero://bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) def test_http(self): url = "https://github.com/jelmer/dulwich" c, path = get_transport_and_path(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("/jelmer/dulwich", path) def test_http_auth(self): url = "https://user:passwd@github.com/jelmer/dulwich" c, path = get_transport_and_path(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("/jelmer/dulwich", path) self.assertEqual("user", c._username) self.assertEqual("passwd", c._password) def test_http_auth_with_username(self): url = "https://github.com/jelmer/dulwich" c, path = get_transport_and_path(url, username="user2", password="blah") self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("/jelmer/dulwich", path) self.assertEqual("user2", c._username) self.assertEqual("blah", c._password) def test_http_auth_with_username_and_in_url(self): url = "https://user:passwd@github.com/jelmer/dulwich" c, path = get_transport_and_path(url, username="user2", password="blah") self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("/jelmer/dulwich", path) self.assertEqual("user", c._username) self.assertEqual("passwd", c._password) def test_http_no_auth(self): url = "https://github.com/jelmer/dulwich" c, path = get_transport_and_path(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("/jelmer/dulwich", path) self.assertIs(None, c._username) self.assertIs(None, c._password) class TestGetTransportAndPathFromUrl(TestCase): def test_tcp(self): c, path = get_transport_and_path_from_url("git://foo.com/bar/baz") self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual("foo.com", c._host) self.assertEqual(TCP_GIT_PORT, c._port) self.assertEqual("/bar/baz", path) def test_tcp_port(self): c, path = get_transport_and_path_from_url("git://foo.com:1234/bar/baz") self.assertTrue(isinstance(c, TCPGitClient)) self.assertEqual("foo.com", c._host) self.assertEqual(1234, c._port) self.assertEqual("/bar/baz", path) def test_ssh_explicit(self): c, path = get_transport_and_path_from_url("git+ssh://foo.com/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("/bar/baz", path) def test_ssh_port_explicit(self): c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(1234, c.port) self.assertEqual("/bar/baz", path) def test_ssh_homepath(self): c, path = get_transport_and_path_from_url("git+ssh://foo.com/~/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(None, c.port) self.assertEqual(None, c.username) self.assertEqual("/~/bar/baz", path) def test_ssh_port_homepath(self): c, path = get_transport_and_path_from_url("git+ssh://foo.com:1234/~/bar/baz") self.assertTrue(isinstance(c, SSHGitClient)) self.assertEqual("foo.com", c.host) self.assertEqual(1234, c.port) self.assertEqual("/~/bar/baz", path) def test_ssh_host_relpath(self): self.assertRaises( ValueError, get_transport_and_path_from_url, "foo.com:bar/baz" ) def test_ssh_user_host_relpath(self): self.assertRaises( ValueError, get_transport_and_path_from_url, "user@foo.com:bar/baz" ) def test_local_path(self): self.assertRaises(ValueError, get_transport_and_path_from_url, "foo.bar/baz") def test_error(self): # Need to use a known urlparse.uses_netloc URL scheme to get the # expected parsing of the URL on Python versions less than 2.6.5 self.assertRaises( ValueError, get_transport_and_path_from_url, "prospero://bar/baz" ) def test_http(self): url = "https://github.com/jelmer/dulwich" c, path = get_transport_and_path_from_url(url) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("https://github.com", c.get_url(b"/")) self.assertEqual("/jelmer/dulwich", path) def test_http_port(self): url = "https://github.com:9090/jelmer/dulwich" c, path = get_transport_and_path_from_url(url) self.assertEqual("https://github.com:9090", c.get_url(b"/")) self.assertTrue(isinstance(c, HttpGitClient)) self.assertEqual("/jelmer/dulwich", path) def test_file(self): c, path = get_transport_and_path_from_url("file:///home/jelmer/foo") self.assertTrue(isinstance(c, LocalGitClient)) self.assertEqual("/home/jelmer/foo", path) class TestSSHVendor(object): def __init__(self): self.host = None self.command = "" self.username = None self.port = None self.password = None self.key_filename = None def run_command( self, host, command, username=None, port=None, password=None, key_filename=None, ): self.host = host self.command = command self.username = username self.port = port self.password = password self.key_filename = key_filename class Subprocess: pass setattr(Subprocess, "read", lambda: None) setattr(Subprocess, "write", lambda: None) setattr(Subprocess, "close", lambda: None) setattr(Subprocess, "can_read", lambda: None) return Subprocess() class SSHGitClientTests(TestCase): def setUp(self): super(SSHGitClientTests, self).setUp() self.server = TestSSHVendor() self.real_vendor = client.get_ssh_vendor client.get_ssh_vendor = lambda: self.server self.client = SSHGitClient("git.samba.org") def tearDown(self): super(SSHGitClientTests, self).tearDown() client.get_ssh_vendor = self.real_vendor def test_get_url(self): path = "/tmp/repo.git" c = SSHGitClient("git.samba.org") url = c.get_url(path) self.assertEqual("ssh://git.samba.org/tmp/repo.git", url) def test_get_url_with_username_and_port(self): path = "/tmp/repo.git" c = SSHGitClient("git.samba.org", port=2222, username="user") url = c.get_url(path) self.assertEqual("ssh://user@git.samba.org:2222/tmp/repo.git", url) def test_default_command(self): self.assertEqual(b"git-upload-pack", self.client._get_cmd_path(b"upload-pack")) def test_alternative_command_path(self): self.client.alternative_paths[b"upload-pack"] = b"/usr/lib/git/git-upload-pack" self.assertEqual( b"/usr/lib/git/git-upload-pack", self.client._get_cmd_path(b"upload-pack"), ) def test_alternative_command_path_spaces(self): self.client.alternative_paths[ b"upload-pack" ] = b"/usr/lib/git/git-upload-pack -ibla" self.assertEqual( b"/usr/lib/git/git-upload-pack -ibla", self.client._get_cmd_path(b"upload-pack"), ) def test_connect(self): server = self.server client = self.client client.username = b"username" client.port = 1337 client._connect(b"command", b"/path/to/repo") self.assertEqual(b"username", server.username) self.assertEqual(1337, server.port) self.assertEqual("git-command '/path/to/repo'", server.command) client._connect(b"relative-command", b"/~/path/to/repo") self.assertEqual("git-relative-command '~/path/to/repo'", server.command) class ReportStatusParserTests(TestCase): def test_invalid_pack(self): parser = ReportStatusParser() parser.handle_packet(b"unpack error - foo bar") parser.handle_packet(b"ok refs/foo/bar") parser.handle_packet(None) self.assertRaises(SendPackError, list, parser.check()) def test_update_refs_error(self): parser = ReportStatusParser() parser.handle_packet(b"unpack ok") parser.handle_packet(b"ng refs/foo/bar need to pull") parser.handle_packet(None) self.assertEqual([(b"refs/foo/bar", "need to pull")], list(parser.check())) def test_ok(self): parser = ReportStatusParser() parser.handle_packet(b"unpack ok") parser.handle_packet(b"ok refs/foo/bar") parser.handle_packet(None) self.assertEqual([(b"refs/foo/bar", None)], list(parser.check())) class LocalGitClientTests(TestCase): def test_get_url(self): path = "/tmp/repo.git" c = LocalGitClient() url = c.get_url(path) self.assertEqual("file:///tmp/repo.git", url) def test_fetch_into_empty(self): c = LocalGitClient() t = MemoryRepo() s = open_repo("a.git") self.addCleanup(tear_down_repo, s) self.assertEqual(s.get_refs(), c.fetch(s.path, t).refs) def test_fetch_empty(self): c = LocalGitClient() s = open_repo("a.git") self.addCleanup(tear_down_repo, s) out = BytesIO() walker = {} ret = c.fetch_pack( - s.path, lambda heads: [], graph_walker=walker, pack_data=out.write + s.path, lambda heads, **kwargs: [], graph_walker=walker, pack_data=out.write ) self.assertEqual( { b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a", b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50", }, ret.refs, ) self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs) self.assertEqual( b"PACK\x00\x00\x00\x02\x00\x00\x00\x00\x02\x9d\x08" b"\x82;\xd8\xa8\xea\xb5\x10\xadj\xc7\\\x82<\xfd>\xd3\x1e", out.getvalue(), ) def test_fetch_pack_none(self): c = LocalGitClient() s = open_repo("a.git") self.addCleanup(tear_down_repo, s) out = BytesIO() walker = MemoryRepo().get_graph_walker() ret = c.fetch_pack( s.path, - lambda heads: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], + lambda heads, **kwargs: [b"a90fa2d900a17e99b433217e988c4eb4a2e9a097"], graph_walker=walker, pack_data=out.write, ) self.assertEqual({b"HEAD": b"refs/heads/master"}, ret.symrefs) self.assertEqual( { b"HEAD": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", b"refs/heads/master": b"a90fa2d900a17e99b433217e988c4eb4a2e9a097", b"refs/tags/mytag": b"28237f4dc30d0d462658d6b937b08a0f0b6ef55a", b"refs/tags/mytag-packed": b"b0931cadc54336e78a1d980420e3268903b57a50", }, ret.refs, ) # Hardcoding is not ideal, but we'll fix that some other day.. self.assertTrue( out.getvalue().startswith(b"PACK\x00\x00\x00\x02\x00\x00\x00\x07") ) def test_send_pack_without_changes(self): local = open_repo("a.git") self.addCleanup(tear_down_repo, local) target = open_repo("a.git") self.addCleanup(tear_down_repo, target) self.send_and_verify(b"master", local, target) def test_send_pack_with_changes(self): local = open_repo("a.git") self.addCleanup(tear_down_repo, local) target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) with Repo.init_bare(target_path) as target: self.send_and_verify(b"master", local, target) def test_get_refs(self): local = open_repo("refs.git") self.addCleanup(tear_down_repo, local) client = LocalGitClient() refs = client.get_refs(local.path) self.assertDictEqual(local.refs.as_dict(), refs) def send_and_verify(self, branch, local, target): """Send branch from local to remote repository and verify it worked.""" client = LocalGitClient() ref_name = b"refs/heads/" + branch result = client.send_pack( target.path, lambda _: {ref_name: local.refs[ref_name]}, local.generate_pack_data, ) self.assertEqual(local.refs[ref_name], result.refs[ref_name]) self.assertIs(None, result.agent) self.assertEqual({}, result.ref_status) obj_local = local.get_object(result.refs[ref_name]) obj_target = target.get_object(result.refs[ref_name]) self.assertEqual(obj_local, obj_target) class HttpGitClientTests(TestCase): def test_get_url(self): base_url = "https://github.com/jelmer/dulwich" path = "/jelmer/dulwich" c = HttpGitClient(base_url) url = c.get_url(path) self.assertEqual("https://github.com/jelmer/dulwich", url) def test_get_url_bytes_path(self): base_url = "https://github.com/jelmer/dulwich" path_bytes = b"/jelmer/dulwich" c = HttpGitClient(base_url) url = c.get_url(path_bytes) self.assertEqual("https://github.com/jelmer/dulwich", url) def test_get_url_with_username_and_passwd(self): base_url = "https://github.com/jelmer/dulwich" path = "/jelmer/dulwich" c = HttpGitClient(base_url, username="USERNAME", password="PASSWD") url = c.get_url(path) self.assertEqual("https://github.com/jelmer/dulwich", url) def test_init_username_passwd_set(self): url = "https://github.com/jelmer/dulwich" c = HttpGitClient(url, config=None, username="user", password="passwd") self.assertEqual("user", c._username) self.assertEqual("passwd", c._password) basic_auth = c.pool_manager.headers["authorization"] auth_string = "%s:%s" % ("user", "passwd") b64_credentials = base64.b64encode(auth_string.encode("latin1")) expected_basic_auth = "Basic %s" % b64_credentials.decode("latin1") self.assertEqual(basic_auth, expected_basic_auth) def test_init_no_username_passwd(self): url = "https://github.com/jelmer/dulwich" c = HttpGitClient(url, config=None) self.assertIs(None, c._username) self.assertIs(None, c._password) self.assertNotIn("authorization", c.pool_manager.headers) def test_from_parsedurl_on_url_with_quoted_credentials(self): original_username = "john|the|first" quoted_username = urlquote(original_username) original_password = "Ya#1$2%3" quoted_password = urlquote(original_password) url = "https://{username}:{password}@github.com/jelmer/dulwich".format( username=quoted_username, password=quoted_password ) c = HttpGitClient.from_parsedurl(urlparse(url)) self.assertEqual(original_username, c._username) self.assertEqual(original_password, c._password) basic_auth = c.pool_manager.headers["authorization"] auth_string = "%s:%s" % (original_username, original_password) b64_credentials = base64.b64encode(auth_string.encode("latin1")) expected_basic_auth = "Basic %s" % b64_credentials.decode("latin1") self.assertEqual(basic_auth, expected_basic_auth) def test_url_redirect_location(self): from urllib3.response import HTTPResponse test_data = { "https://gitlab.com/inkscape/inkscape/": { "redirect_url": "https://gitlab.com/inkscape/inkscape.git/", "refs_data": ( b"001e# service=git-upload-pack\n00000032" b"fb2bebf4919a011f0fd7cec085443d0031228e76 " b"HEAD\n0000" ), }, "https://github.com/jelmer/dulwich/": { "redirect_url": "https://github.com/jelmer/dulwich/", "refs_data": ( b"001e# service=git-upload-pack\n00000032" b"3ff25e09724aa4d86ea5bca7d5dd0399a3c8bfcf " b"HEAD\n0000" ), }, } tail = "info/refs?service=git-upload-pack" # we need to mock urllib3.PoolManager as this test will fail # otherwise without an active internet connection class PoolManagerMock: def __init__(self): self.headers = {} def request(self, method, url, fields=None, headers=None, redirect=True): base_url = url[: -len(tail)] redirect_base_url = test_data[base_url]["redirect_url"] redirect_url = redirect_base_url + tail headers = { "Content-Type": "application/x-git-upload-pack-advertisement" } body = test_data[base_url]["refs_data"] # urllib3 handles automatic redirection by default status = 200 request_url = redirect_url # simulate urllib3 behavior when redirect parameter is False if redirect is False: request_url = url if redirect_base_url != base_url: body = "" headers["location"] = redirect_url status = 301 return HTTPResponse( body=body, headers=headers, request_method=method, request_url=request_url, status=status, ) pool_manager = PoolManagerMock() for base_url in test_data.keys(): # instantiate HttpGitClient with mocked pool manager c = HttpGitClient(base_url, pool_manager=pool_manager, config=None) # call method that detects url redirection _, _, processed_url = c._discover_references(b"git-upload-pack", base_url) # send the same request as the method above without redirection resp = c.pool_manager.request("GET", base_url + tail, redirect=False) # check expected behavior of urllib3 redirect_location = resp.get_redirect_location() if resp.status == 200: self.assertFalse(redirect_location) if redirect_location: # check that url redirection has been correctly detected self.assertEqual(processed_url, redirect_location[: -len(tail)]) else: # check also the no redirection case self.assertEqual(processed_url, base_url) class TCPGitClientTests(TestCase): def test_get_url(self): host = "github.com" path = "/jelmer/dulwich" c = TCPGitClient(host) url = c.get_url(path) self.assertEqual("git://github.com/jelmer/dulwich", url) def test_get_url_with_port(self): host = "github.com" path = "/jelmer/dulwich" port = 9090 c = TCPGitClient(host, port=port) url = c.get_url(path) self.assertEqual("git://github.com:9090/jelmer/dulwich", url) class DefaultUrllib3ManagerTest(TestCase): def test_no_config(self): manager = default_urllib3_manager(config=None) self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED") def test_config_no_proxy(self): import urllib3 manager = default_urllib3_manager(config=ConfigDict()) self.assertNotIsInstance(manager, urllib3.ProxyManager) self.assertIsInstance(manager, urllib3.PoolManager) def test_config_no_proxy_custom_cls(self): import urllib3 class CustomPoolManager(urllib3.PoolManager): pass manager = default_urllib3_manager( config=ConfigDict(), pool_manager_cls=CustomPoolManager ) self.assertIsInstance(manager, CustomPoolManager) def test_config_ssl(self): config = ConfigDict() config.set(b"http", b"sslVerify", b"true") manager = default_urllib3_manager(config=config) self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED") def test_config_no_ssl(self): config = ConfigDict() config.set(b"http", b"sslVerify", b"false") manager = default_urllib3_manager(config=config) self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE") def test_config_proxy(self): import urllib3 config = ConfigDict() config.set(b"http", b"proxy", b"http://localhost:3128/") manager = default_urllib3_manager(config=config) self.assertIsInstance(manager, urllib3.ProxyManager) self.assertTrue(hasattr(manager, "proxy")) self.assertEqual(manager.proxy.scheme, "http") self.assertEqual(manager.proxy.host, "localhost") self.assertEqual(manager.proxy.port, 3128) def test_environment_proxy(self): import urllib3 config = ConfigDict() os.environ["http_proxy"] = "http://myproxy:8080" manager = default_urllib3_manager(config=config) self.assertIsInstance(manager, urllib3.ProxyManager) self.assertTrue(hasattr(manager, "proxy")) self.assertEqual(manager.proxy.scheme, "http") self.assertEqual(manager.proxy.host, "myproxy") self.assertEqual(manager.proxy.port, 8080) del os.environ["http_proxy"] def test_config_proxy_custom_cls(self): import urllib3 class CustomProxyManager(urllib3.ProxyManager): pass config = ConfigDict() config.set(b"http", b"proxy", b"http://localhost:3128/") manager = default_urllib3_manager( config=config, proxy_manager_cls=CustomProxyManager ) self.assertIsInstance(manager, CustomProxyManager) def test_config_no_verify_ssl(self): manager = default_urllib3_manager(config=None, cert_reqs="CERT_NONE") self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_NONE") class SubprocessSSHVendorTests(TestCase): def setUp(self): # Monkey Patch client subprocess popen self._orig_popen = dulwich.client.subprocess.Popen dulwich.client.subprocess.Popen = DummyPopen def tearDown(self): dulwich.client.subprocess.Popen = self._orig_popen def test_run_command_dashes(self): vendor = SubprocessSSHVendor() self.assertRaises( StrangeHostname, vendor.run_command, "--weird-host", "git-clone-url", ) def test_run_command_password(self): vendor = SubprocessSSHVendor() self.assertRaises( NotImplementedError, vendor.run_command, "host", "git-clone-url", password="12345", ) def test_run_command_password_and_privkey(self): vendor = SubprocessSSHVendor() self.assertRaises( NotImplementedError, vendor.run_command, "host", "git-clone-url", password="12345", key_filename="/tmp/id_rsa", ) def test_run_command_with_port_username_and_privkey(self): expected = [ "ssh", "-x", "-p", "2200", "-i", "/tmp/id_rsa", "user@host", "git-clone-url", ] vendor = SubprocessSSHVendor() command = vendor.run_command( "host", "git-clone-url", username="user", port="2200", key_filename="/tmp/id_rsa", ) args = command.proc.args self.assertListEqual(expected, args[0]) class PLinkSSHVendorTests(TestCase): def setUp(self): # Monkey Patch client subprocess popen self._orig_popen = dulwich.client.subprocess.Popen dulwich.client.subprocess.Popen = DummyPopen def tearDown(self): dulwich.client.subprocess.Popen = self._orig_popen def test_run_command_dashes(self): vendor = PLinkSSHVendor() self.assertRaises( StrangeHostname, vendor.run_command, "--weird-host", "git-clone-url", ) def test_run_command_password_and_privkey(self): vendor = PLinkSSHVendor() warnings.simplefilter("always", UserWarning) self.addCleanup(warnings.resetwarnings) warnings_list, restore_warnings = setup_warning_catcher() self.addCleanup(restore_warnings) command = vendor.run_command( "host", "git-clone-url", password="12345", key_filename="/tmp/id_rsa", ) expected_warning = UserWarning( "Invoking PLink with a password exposes the password in the " "process list." ) for w in warnings_list: if type(w) == type(expected_warning) and w.args == expected_warning.args: break else: raise AssertionError( "Expected warning %r not in %r" % (expected_warning, warnings_list) ) args = command.proc.args if sys.platform == "win32": binary = ["plink.exe", "-ssh"] else: binary = ["plink", "-ssh"] expected = binary + [ "-pw", "12345", "-i", "/tmp/id_rsa", "host", "git-clone-url", ] self.assertListEqual(expected, args[0]) def test_run_command_password(self): if sys.platform == "win32": binary = ["plink.exe", "-ssh"] else: binary = ["plink", "-ssh"] expected = binary + ["-pw", "12345", "host", "git-clone-url"] vendor = PLinkSSHVendor() warnings.simplefilter("always", UserWarning) self.addCleanup(warnings.resetwarnings) warnings_list, restore_warnings = setup_warning_catcher() self.addCleanup(restore_warnings) command = vendor.run_command("host", "git-clone-url", password="12345") expected_warning = UserWarning( "Invoking PLink with a password exposes the password in the " "process list." ) for w in warnings_list: if type(w) == type(expected_warning) and w.args == expected_warning.args: break else: raise AssertionError( "Expected warning %r not in %r" % (expected_warning, warnings_list) ) args = command.proc.args self.assertListEqual(expected, args[0]) def test_run_command_with_port_username_and_privkey(self): if sys.platform == "win32": binary = ["plink.exe", "-ssh"] else: binary = ["plink", "-ssh"] expected = binary + [ "-P", "2200", "-i", "/tmp/id_rsa", "user@host", "git-clone-url", ] vendor = PLinkSSHVendor() command = vendor.run_command( "host", "git-clone-url", username="user", port="2200", key_filename="/tmp/id_rsa", ) args = command.proc.args self.assertListEqual(expected, args[0]) class RsyncUrlTests(TestCase): def test_simple(self): self.assertEqual(parse_rsync_url("foo:bar/path"), (None, "foo", "bar/path")) self.assertEqual( parse_rsync_url("user@foo:bar/path"), ("user", "foo", "bar/path") ) def test_path(self): self.assertRaises(ValueError, parse_rsync_url, "/path") class CheckWantsTests(TestCase): def test_fine(self): check_wants( [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"], {b"refs/heads/blah": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"}, ) def test_missing(self): self.assertRaises( InvalidWants, check_wants, [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"], {b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262"}, ) def test_annotated(self): self.assertRaises( InvalidWants, check_wants, [b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"], { b"refs/heads/blah": b"3f3dc7a53fb752a6961d3a56683df46d4d3bf262", b"refs/heads/blah^{}": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262", }, ) class FetchPackResultTests(TestCase): def test_eq(self): self.assertEqual( FetchPackResult( {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"}, {}, b"user/agent", ), FetchPackResult( {b"refs/heads/master": b"2f3dc7a53fb752a6961d3a56683df46d4d3bf262"}, {}, b"user/agent", ), ) class GitCredentialStoreTests(TestCase): @classmethod def setUpClass(cls): with tempfile.NamedTemporaryFile(delete=False) as f: f.write(b"https://user:pass@example.org\n") cls.fname = f.name @classmethod def tearDownClass(cls): os.unlink(cls.fname) def test_nonmatching_scheme(self): self.assertEqual( get_credentials_from_store(b"http", b"example.org", fnames=[self.fname]), None, ) def test_nonmatching_hostname(self): self.assertEqual( get_credentials_from_store(b"https", b"noentry.org", fnames=[self.fname]), None, ) def test_match_without_username(self): self.assertEqual( get_credentials_from_store(b"https", b"example.org", fnames=[self.fname]), (b"user", b"pass"), ) def test_match_with_matching_username(self): self.assertEqual( get_credentials_from_store( b"https", b"example.org", b"user", fnames=[self.fname] ), (b"user", b"pass"), ) def test_no_match_with_nonmatching_username(self): self.assertEqual( get_credentials_from_store( b"https", b"example.org", b"otheruser", fnames=[self.fname] ), None, ) class RemoteErrorFromStderrTests(TestCase): def test_nothing(self): self.assertEqual(_remote_error_from_stderr(None), HangupException()) def test_error_line(self): b = BytesIO( b"""\ This is some random output. ERROR: This is the actual error with a tail """ ) self.assertEqual( _remote_error_from_stderr(b), GitProtocolError("This is the actual error"), ) def test_no_error_line(self): b = BytesIO( b"""\ This is output without an error line. And this line is just random noise, too. """ ) self.assertEqual( _remote_error_from_stderr(b), HangupException( [ b"This is output without an error line.", b"And this line is just random noise, too.", ] ), )