diff --git a/bin/dulwich b/bin/dulwich index 3fc4e215..537ed6f3 100755 --- a/bin/dulwich +++ b/bin/dulwich @@ -1,392 +1,399 @@ #!/usr/bin/python -u # # dulwich - Simple command-line interface to Dulwich # Copyright (C) 2008-2011 Jelmer Vernooij # vim: expandtab # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 # or (at your option) a later version of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Simple command-line interface to Dulwich> This is a very simple command-line wrapper for Dulwich. It is by no means intended to be a full-blown Git command-line interface but just a way to test Dulwich. """ import os import sys from getopt import getopt import optparse import signal def signal_int(signal, frame): sys.exit(1) signal.signal(signal.SIGINT, signal_int) from dulwich import porcelain from dulwich.client import get_transport_and_path from dulwich.errors import ApplyDeltaError from dulwich.index import Index from dulwich.pack import Pack, sha_to_hex from dulwich.patch import write_tree_diff from dulwich.repo import Repo def cmd_archive(args): opts, args = getopt(args, "", []) client, path = get_transport_and_path(args.pop(0)) location = args.pop(0) committish = args.pop(0) porcelain.archive(location, committish, outstream=sys.stdout, errstream=sys.stderr) def cmd_add(args): opts, args = getopt(args, "", []) porcelain.add(".", paths=args) def cmd_rm(args): opts, args = getopt(args, "", []) porcelain.rm(".", paths=args) def cmd_fetch_pack(args): opts, args = getopt(args, "", ["all"]) opts = dict(opts) client, path = get_transport_and_path(args.pop(0)) r = Repo(".") if "--all" in opts: determine_wants = r.object_store.determine_wants_all else: determine_wants = lambda x: [y for y in args if not y in r.object_store] client.fetch(path, r, determine_wants) def cmd_fetch(args): opts, args = getopt(args, "", []) opts = dict(opts) client, path = get_transport_and_path(args.pop(0)) r = Repo(".") if "--all" in opts: determine_wants = r.object_store.determine_wants_all refs = client.fetch(path, r, progress=sys.stdout.write) print("Remote refs:") for item in refs.items(): print("%s -> %s" % item) def cmd_log(args): opts, args = getopt(args, "", []) if len(args) > 0: path = args.pop(0) else: path = "." porcelain.log(repo=path, outstream=sys.stdout) def cmd_diff(args): opts, args = getopt(args, "", []) if args == []: print("Usage: dulwich diff COMMITID") sys.exit(1) r = Repo(".") commit_id = args[0] commit = r[commit_id] parent_commit = r[commit.parents[0]] write_tree_diff(sys.stdout, r.object_store, parent_commit.tree, commit.tree) def cmd_dump_pack(args): opts, args = getopt(args, "", []) if args == []: print("Usage: dulwich dump-pack FILENAME") sys.exit(1) basename, _ = os.path.splitext(args[0]) x = Pack(basename) print("Object names checksum: %s" % x.name()) print("Checksum: %s" % sha_to_hex(x.get_stored_checksum())) if not x.check(): print("CHECKSUM DOES NOT MATCH") print("Length: %d" % len(x)) for name in x: try: print("\t%s" % x[name]) except KeyError as k: print("\t%s: Unable to resolve base %s" % (name, k)) except ApplyDeltaError as e: print("\t%s: Unable to apply delta: %r" % (name, e)) def cmd_dump_index(args): opts, args = getopt(args, "", []) if args == []: print("Usage: dulwich dump-index FILENAME") sys.exit(1) filename = args[0] idx = Index(filename) for o in idx: print(o, idx[o]) def cmd_init(args): opts, args = getopt(args, "", ["bare"]) opts = dict(opts) if args == []: path = os.getcwd() else: path = args[0] porcelain.init(path, bare=("--bare" in opts)) def cmd_clone(args): opts, args = getopt(args, "", ["bare"]) opts = dict(opts) if args == []: print("usage: dulwich clone host:path [PATH]") sys.exit(1) source = args.pop(0) if len(args) > 0: target = args.pop(0) else: target = None porcelain.clone(source, target, bare=("--bare" in opts)) def cmd_commit(args): opts, args = getopt(args, "", ["message"]) opts = dict(opts) porcelain.commit(".", message=opts["--message"]) def cmd_commit_tree(args): opts, args = getopt(args, "", ["message"]) if args == []: print("usage: dulwich commit-tree tree") sys.exit(1) opts = dict(opts) porcelain.commit_tree(".", tree=args[0], message=opts["--message"]) def cmd_update_server_info(args): porcelain.update_server_info(".") def cmd_symbolic_ref(args): opts, args = getopt(args, "", ["ref-name", "force"]) if not args: print("Usage: dulwich symbolic-ref REF_NAME [--force]") sys.exit(1) ref_name = args.pop(0) porcelain.symbolic_ref(".", ref_name=ref_name, force='--force' in args) def cmd_show(args): opts, args = getopt(args, "", []) porcelain.show(".", args) def cmd_diff_tree(args): opts, args = getopt(args, "", []) if len(args) < 2: print("Usage: dulwich diff-tree OLD-TREE NEW-TREE") sys.exit(1) porcelain.diff_tree(".", args[0], args[1]) def cmd_rev_list(args): opts, args = getopt(args, "", []) if len(args) < 1: print('Usage: dulwich rev-list COMMITID...') sys.exit(1) porcelain.rev_list('.', args) def cmd_tag(args): opts, args = getopt(args, '', []) if len(args) < 2: print('Usage: dulwich tag NAME') sys.exit(1) porcelain.tag('.', args[0]) +def cmd_repack(args): + opts, args = getopt(args, "", []) + opts = dict(opts) + porcelain.repack('.') + + def cmd_reset(args): opts, args = getopt(args, "", ["hard", "soft", "mixed"]) opts = dict(opts) mode = "" if "--hard" in opts: mode = "hard" elif "--soft" in opts: mode = "soft" elif "--mixed" in opts: mode = "mixed" porcelain.reset('.', mode=mode, *args) def cmd_daemon(args): from dulwich import log_utils from dulwich.protocol import TCP_GIT_PORT parser = optparse.OptionParser() parser.add_option("-l", "--listen_address", dest="listen_address", default="localhost", help="Binding IP address.") parser.add_option("-p", "--port", dest="port", type=int, default=TCP_GIT_PORT, help="Binding TCP port.") options, args = parser.parse_args(args) log_utils.default_logging_config() if len(args) >= 1: gitdir = args[0] else: gitdir = '.' from dulwich import porcelain porcelain.daemon(gitdir, address=options.listen_address, port=options.port) def cmd_web_daemon(args): from dulwich import log_utils parser = optparse.OptionParser() parser.add_option("-l", "--listen_address", dest="listen_address", default="", help="Binding IP address.") parser.add_option("-p", "--port", dest="port", type=int, default=8000, help="Binding TCP port.") options, args = parser.parse_args(args) log_utils.default_logging_config() if len(args) >= 1: gitdir = args[0] else: gitdir = '.' from dulwich import porcelain porcelain.web_daemon(gitdir, address=options.listen_address, port=options.port) def cmd_receive_pack(args): parser = optparse.OptionParser() options, args = parser.parse_args(args) if len(args) >= 1: gitdir = args[0] else: gitdir = '.' porcelain.receive_pack(gitdir) def cmd_upload_pack(args): parser = optparse.OptionParser() options, args = parser.parse_args(args) if len(args) >= 1: gitdir = args[0] else: gitdir = '.' porcelain.upload_pack(gitdir) def cmd_status(args): parser = optparse.OptionParser() options, args = parser.parse_args(args) if len(args) >= 1: gitdir = args[0] else: gitdir = '.' status = porcelain.status(gitdir) if status.staged: sys.stdout.write("Changes to be committed:\n\n") for kind, names in status.staged.items(): for name in names: sys.stdout.write("\t%s: %s\n" % (kind, name)) sys.stdout.write("\n") if status.unstaged: sys.stdout.write("Changes not staged for commit:\n\n") for name in status.unstaged: sys.stdout.write("\t%s\n" % name.decode(sys.getfilesystemencoding())) sys.stdout.write("\n") if status.untracked: sys.stdout.write("Untracked files:\n\n") for name in status.untracked: sys.stdout.write("\t%s\n" % name) sys.stdout.write("\n") def cmd_ls_remote(args): opts, args = getopt(args, '', []) if len(args) < 1: print('Usage: dulwich ls-remote URL') sys.exit(1) refs = porcelain.ls_remote(args[0]) for ref in sorted(refs): sys.stdout.write("%s\t%s\n" % (ref, refs[ref])) commands = { "add": cmd_add, "archive": cmd_archive, "clone": cmd_clone, "commit": cmd_commit, "commit-tree": cmd_commit_tree, "daemon": cmd_daemon, "diff": cmd_diff, "diff-tree": cmd_diff_tree, "dump-pack": cmd_dump_pack, "dump-index": cmd_dump_index, "fetch-pack": cmd_fetch_pack, "fetch": cmd_fetch, "init": cmd_init, "log": cmd_log, "ls-remote": cmd_ls_remote, "receive-pack": cmd_receive_pack, + "repack": cmd_repack, "reset": cmd_reset, "rev-list": cmd_rev_list, "rm": cmd_rm, "show": cmd_show, "status": cmd_status, "symbolic-ref": cmd_symbolic_ref, "tag": cmd_tag, "update-server-info": cmd_update_server_info, "upload-pack": cmd_upload_pack, "web-daemon": cmd_web_daemon, } if len(sys.argv) < 2: print("Usage: %s <%s> [OPTIONS...]" % (sys.argv[0], "|".join(commands.keys()))) sys.exit(1) cmd = sys.argv[1] if not cmd in commands: print("No such subcommand: %s" % cmd) sys.exit(1) commands[cmd](sys.argv[2:]) diff --git a/dulwich/porcelain.py b/dulwich/porcelain.py index 8625b2ba..e65ff58c 100644 --- a/dulwich/porcelain.py +++ b/dulwich/porcelain.py @@ -1,814 +1,825 @@ # porcelain.py -- Porcelain-like layer on top of Dulwich # Copyright (C) 2013 Jelmer Vernooij # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # or (at your option) a later version of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Simple wrapper that provides porcelain-like functions on top of Dulwich. Currently implemented: * archive * add * branch{_create,_delete,_list} * clone * commit * commit-tree * daemon * diff-tree * fetch * init * ls-remote * pull * push * rm * receive-pack * reset * rev-list * tag{_create,_delete,_list} * upload-pack * update-server-info * status * symbolic-ref These functions are meant to behave similarly to the git subcommands. Differences in behaviour are considered bugs. """ __docformat__ = 'restructuredText' from collections import namedtuple from contextlib import ( closing, contextmanager, ) import os import sys import time from dulwich.client import ( get_transport_and_path, SubprocessGitClient, ) from dulwich.errors import ( SendPackError, UpdateRefsError, ) from dulwich.index import get_unstaged_changes from dulwich.objects import ( Commit, Tag, parse_timezone, ) from dulwich.objectspec import ( parse_object, parse_reftuples, ) from dulwich.patch import write_tree_diff from dulwich.protocol import Protocol from dulwich.repo import (BaseRepo, Repo) from dulwich.server import ( FileSystemBackend, TCPGitServer, ReceivePackHandler, UploadPackHandler, update_server_info as server_update_server_info, ) # Module level tuple definition for status output GitStatus = namedtuple('GitStatus', 'staged unstaged untracked') def encode_path(path): """Encode a path as bytestring.""" # TODO(jelmer): Use something other than ascii? if not isinstance(path, bytes): path = path.encode('ascii') return path def open_repo(path_or_repo): """Open an argument that can be a repository or a path for a repository.""" if isinstance(path_or_repo, BaseRepo): return path_or_repo return Repo(path_or_repo) @contextmanager def _noop_context_manager(obj): """Context manager that has the same api as closing but does nothing.""" yield obj def open_repo_closing(path_or_repo): """Open an argument that can be a repository or a path for a repository. returns a context manager that will close the repo on exit if the argument is a path, else does nothing if the argument is a repo. """ if isinstance(path_or_repo, BaseRepo): return _noop_context_manager(path_or_repo) return closing(Repo(path_or_repo)) def archive(path, committish=None, outstream=sys.stdout, errstream=sys.stderr): """Create an archive. :param path: Path of repository for which to generate an archive. :param committish: Commit SHA1 or ref to use :param outstream: Output stream (defaults to stdout) :param errstream: Error stream (defaults to stderr) """ client = SubprocessGitClient() if committish is None: committish = "HEAD" if not isinstance(path, bytes): path = path.encode(sys.getfilesystemencoding()) # TODO(jelmer): This invokes C git; this introduces a dependency. # Instead, dulwich should have its own archiver implementation. client.archive(path, committish, outstream.write, errstream.write, errstream.write) def update_server_info(repo="."): """Update server info files for a repository. :param repo: path to the repository """ with open_repo_closing(repo) as r: server_update_server_info(r) def symbolic_ref(repo, ref_name, force=False): """Set git symbolic ref into HEAD. :param repo: path to the repository :param ref_name: short name of the new ref :param force: force settings without checking if it exists in refs/heads """ with open_repo_closing(repo) as repo_obj: ref_path = b'refs/heads/' + ref_name if not force and ref_path not in repo_obj.refs.keys(): raise ValueError('fatal: ref `%s` is not a ref' % ref_name) repo_obj.refs.set_symbolic_ref(b'HEAD', ref_path) def commit(repo=".", message=None, author=None, committer=None): """Create a new commit. :param repo: Path to repository :param message: Optional commit message :param author: Optional author name and email :param committer: Optional committer name and email :return: SHA1 of the new commit """ # FIXME: Support --all argument # FIXME: Support --signoff argument with open_repo_closing(repo) as r: return r.do_commit(message=message, author=author, committer=committer) def commit_tree(repo, tree, message=None, author=None, committer=None): """Create a new commit object. :param repo: Path to repository :param tree: An existing tree object :param author: Optional author name and email :param committer: Optional committer name and email """ with open_repo_closing(repo) as r: return r.do_commit(message=message, tree=tree, committer=committer, author=author) def init(path=".", bare=False): """Create a new git repository. :param path: Path to repository. :param bare: Whether to create a bare repository. :return: A Repo instance """ if not os.path.exists(path): os.mkdir(path) if bare: return Repo.init_bare(path) else: return Repo.init(path) def clone(source, target=None, bare=False, checkout=None, errstream=sys.stdout, outstream=None): """Clone a local or remote git repository. :param source: Path or URL for source repository :param target: Path to target repository (optional) :param bare: Whether or not to create a bare repository :param errstream: Optional stream to write progress to :param outstream: Optional stream to write progress to (deprecated) :return: The new repository """ if outstream is not None: import warnings warnings.warn("outstream= has been deprecated in favour of errstream=.", DeprecationWarning, stacklevel=3) errstream = outstream if checkout is None: checkout = (not bare) if checkout and bare: raise ValueError("checkout and bare are incompatible") client, host_path = get_transport_and_path(source) if target is None: target = host_path.split("/")[-1] if not os.path.exists(target): os.mkdir(target) if bare: r = Repo.init_bare(target) else: r = Repo.init(target) try: remote_refs = client.fetch(host_path, r, determine_wants=r.object_store.determine_wants_all, progress=errstream.write) r[b"HEAD"] = remote_refs[b"HEAD"] if checkout: errstream.write(b'Checking out HEAD\n') r.reset_index() except: r.close() raise return r def add(repo=".", paths=None): """Add files to the staging area. :param repo: Repository for the files :param paths: Paths to add. No value passed stages all modified files. """ # FIXME: Support patterns, directories. with open_repo_closing(repo) as r: if not paths: # If nothing is specified, add all non-ignored files. paths = [] for dirpath, dirnames, filenames in os.walk(r.path): # Skip .git and below. if '.git' in dirnames: dirnames.remove('.git') for filename in filenames: paths.append(os.path.join(dirpath[len(r.path)+1:], filename)) r.stage(paths) def rm(repo=".", paths=None): """Remove files from the staging area. :param repo: Repository for the files :param paths: Paths to remove """ with open_repo_closing(repo) as r: index = r.open_index() for p in paths: del index[p.encode(sys.getfilesystemencoding())] index.write() def commit_decode(commit, contents, default_encoding='utf-8'): if commit.encoding is not None: return contents.decode(commit.encoding, "replace") return contents.decode(default_encoding, "replace") def print_commit(commit, decode, outstream=sys.stdout): """Write a human-readable commit log entry. :param commit: A `Commit` object :param outstream: A stream file to write to """ outstream.write("-" * 50 + "\n") outstream.write("commit: " + commit.id.decode('ascii') + "\n") if len(commit.parents) > 1: outstream.write("merge: " + "...".join([c.decode('ascii') for c in commit.parents[1:]]) + "\n") outstream.write("author: " + decode(commit.author) + "\n") outstream.write("committer: " + decode(commit.committer) + "\n") outstream.write("\n") outstream.write(decode(commit.message) + "\n") outstream.write("\n") def print_tag(tag, decode, outstream=sys.stdout): """Write a human-readable tag. :param tag: A `Tag` object :param decode: Function for decoding bytes to unicode string :param outstream: A stream to write to """ outstream.write("Tagger: " + decode(tag.tagger) + "\n") outstream.write("Date: " + decode(tag.tag_time) + "\n") outstream.write("\n") outstream.write(decode(tag.message) + "\n") outstream.write("\n") def show_blob(repo, blob, decode, outstream=sys.stdout): """Write a blob to a stream. :param repo: A `Repo` object :param blob: A `Blob` object :param decode: Function for decoding bytes to unicode string :param outstream: A stream file to write to """ outstream.write(decode(blob.data)) def show_commit(repo, commit, decode, outstream=sys.stdout): """Show a commit to a stream. :param repo: A `Repo` object :param commit: A `Commit` object :param decode: Function for decoding bytes to unicode string :param outstream: Stream to write to """ print_commit(commit, decode=decode, outstream=outstream) parent_commit = repo[commit.parents[0]] write_tree_diff(outstream, repo.object_store, parent_commit.tree, commit.tree) def show_tree(repo, tree, decode, outstream=sys.stdout): """Print a tree to a stream. :param repo: A `Repo` object :param tree: A `Tree` object :param decode: Function for decoding bytes to unicode string :param outstream: Stream to write to """ for n in tree: outstream.write(decode(n) + "\n") def show_tag(repo, tag, decode, outstream=sys.stdout): """Print a tag to a stream. :param repo: A `Repo` object :param tag: A `Tag` object :param decode: Function for decoding bytes to unicode string :param outstream: Stream to write to """ print_tag(tag, decode, outstream) show_object(repo, repo[tag.object[1]], outstream) def show_object(repo, obj, decode, outstream): return { b"tree": show_tree, b"blob": show_blob, b"commit": show_commit, b"tag": show_tag, }[obj.type_name](repo, obj, decode, outstream) def log(repo=".", outstream=sys.stdout, max_entries=None): """Write commit logs. :param repo: Path to repository :param outstream: Stream to write log output to :param max_entries: Optional maximum number of entries to display """ with open_repo_closing(repo) as r: walker = r.get_walker(max_entries=max_entries) for entry in walker: decode = lambda x: commit_decode(entry.commit, x) print_commit(entry.commit, decode, outstream) # TODO(jelmer): better default for encoding? def show(repo=".", objects=None, outstream=sys.stdout, default_encoding='utf-8'): """Print the changes in a commit. :param repo: Path to repository :param objects: Objects to show (defaults to [HEAD]) :param outstream: Stream to write to :param default_encoding: Default encoding to use if none is set in the commit """ if objects is None: objects = ["HEAD"] if not isinstance(objects, list): objects = [objects] with open_repo_closing(repo) as r: for objectish in objects: o = parse_object(r, objectish) if isinstance(o, Commit): decode = lambda x: commit_decode(o, x, default_encoding) else: decode = lambda x: x.decode(default_encoding) show_object(r, o, decode, outstream) def diff_tree(repo, old_tree, new_tree, outstream=sys.stdout): """Compares the content and mode of blobs found via two tree objects. :param repo: Path to repository :param old_tree: Id of old tree :param new_tree: Id of new tree :param outstream: Stream to write to """ with open_repo_closing(repo) as r: write_tree_diff(outstream, r.object_store, old_tree, new_tree) def rev_list(repo, commits, outstream=sys.stdout): """Lists commit objects in reverse chronological order. :param repo: Path to repository :param commits: Commits over which to iterate :param outstream: Stream to write to """ with open_repo_closing(repo) as r: for entry in r.get_walker(include=[r[c].id for c in commits]): outstream.write(entry.commit.id + b"\n") def tag(*args, **kwargs): import warnings warnings.warn("tag has been deprecated in favour of tag_create.", DeprecationWarning) return tag_create(*args, **kwargs) def tag_create(repo, tag, author=None, message=None, annotated=False, objectish="HEAD", tag_time=None, tag_timezone=None): """Creates a tag in git via dulwich calls: :param repo: Path to repository :param tag: tag string :param author: tag author (optional, if annotated is set) :param message: tag message (optional) :param annotated: whether to create an annotated tag :param objectish: object the tag should point at, defaults to HEAD :param tag_time: Optional time for annotated tag :param tag_timezone: Optional timezone for annotated tag """ with open_repo_closing(repo) as r: object = parse_object(r, objectish) if annotated: # Create the tag object tag_obj = Tag() if author is None: # TODO(jelmer): Don't use repo private method. author = r._get_user_identity() tag_obj.tagger = author tag_obj.message = message tag_obj.name = tag tag_obj.object = (type(object), object.id) tag_obj.tag_time = tag_time if tag_time is None: tag_time = int(time.time()) if tag_timezone is None: # TODO(jelmer) Use current user timezone rather than UTC tag_timezone = 0 elif isinstance(tag_timezone, str): tag_timezone = parse_timezone(tag_timezone) tag_obj.tag_timezone = tag_timezone r.object_store.add_object(tag_obj) tag_id = tag_obj.id else: tag_id = object.id r.refs[b'refs/tags/' + tag] = tag_id def list_tags(*args, **kwargs): import warnings warnings.warn("list_tags has been deprecated in favour of tag_list.", DeprecationWarning) return tag_list(*args, **kwargs) def tag_list(repo, outstream=sys.stdout): """List all tags. :param repo: Path to repository :param outstream: Stream to write tags to """ with open_repo_closing(repo) as r: tags = list(r.refs.as_dict(b"refs/tags")) tags.sort() return tags def tag_delete(repo, name): """Remove a tag. :param repo: Path to repository :param name: Name of tag to remove """ with open_repo_closing(repo) as r: if isinstance(name, bytes): names = [name] elif isinstance(name, list): names = name else: raise TypeError("Unexpected tag name type %r" % name) for name in names: del r.refs[b"refs/tags/" + name] def reset(repo, mode, committish="HEAD"): """Reset current HEAD to the specified state. :param repo: Path to repository :param mode: Mode ("hard", "soft", "mixed") """ if mode != "hard": raise ValueError("hard is the only mode currently supported") with open_repo_closing(repo) as r: tree = r[committish].tree r.reset_index() def push(repo, remote_location, refspecs=None, outstream=sys.stdout, errstream=sys.stderr): """Remote push with dulwich via dulwich.client :param repo: Path to repository :param remote_location: Location of the remote :param refspecs: relative path to the refs to push to remote :param outstream: A stream file to write output :param errstream: A stream file to write errors """ # Open the repo with open_repo_closing(repo) as r: # Get the client and path client, path = get_transport_and_path(remote_location) selected_refs = [] def update_refs(refs): selected_refs.extend(parse_reftuples(r.refs, refs, refspecs)) # TODO: Handle selected_refs == {None: None} for (lh, rh, force) in selected_refs: if lh is None: del refs[rh] else: refs[rh] = r.refs[lh] return refs err_encoding = getattr(errstream, 'encoding', 'utf-8') remote_location_bytes = remote_location.encode(err_encoding) try: client.send_pack(path, update_refs, r.object_store.generate_pack_contents, progress=errstream.write) errstream.write(b"Push to " + remote_location_bytes + b" successful.\n") except (UpdateRefsError, SendPackError) as e: errstream.write(b"Push to " + remote_location_bytes + b" failed -> " + e.message.encode(err_encoding) + b"\n") def pull(repo, remote_location, refspecs=None, outstream=sys.stdout, errstream=sys.stderr): """Pull from remote via dulwich.client :param repo: Path to repository :param remote_location: Location of the remote :param refspec: refspecs to fetch :param outstream: A stream file to write to output :param errstream: A stream file to write to errors """ # Open the repo with open_repo_closing(repo) as r: selected_refs = [] def determine_wants(remote_refs): selected_refs.extend(parse_reftuples(remote_refs, r.refs, refspecs)) return [remote_refs[lh] for (lh, rh, force) in selected_refs] client, path = get_transport_and_path(remote_location) remote_refs = client.fetch(path, r, progress=errstream.write, determine_wants=determine_wants) for (lh, rh, force) in selected_refs: r.refs[rh] = remote_refs[lh] if selected_refs: r[b'HEAD'] = remote_refs[selected_refs[0][1]] # Perform 'git checkout .' - syncs staged changes tree = r[b"HEAD"].tree r.reset_index() def status(repo="."): """Returns staged, unstaged, and untracked changes relative to the HEAD. :param repo: Path to repository or repository object :return: GitStatus tuple, staged - list of staged paths (diff index/HEAD) unstaged - list of unstaged paths (diff index/working-tree) untracked - list of untracked, un-ignored & non-.git paths """ with open_repo_closing(repo) as r: # 1. Get status of staged tracked_changes = get_tree_changes(r) # 2. Get status of unstaged unstaged_changes = list(get_unstaged_changes(r.open_index(), r.path)) # TODO - Status of untracked - add untracked changes, need gitignore. untracked_changes = [] return GitStatus(tracked_changes, unstaged_changes, untracked_changes) def get_tree_changes(repo): """Return add/delete/modify changes to tree by comparing index to HEAD. :param repo: repo path or object :return: dict with lists for each type of change """ with open_repo_closing(repo) as r: index = r.open_index() # Compares the Index to the HEAD & determines changes # Iterate through the changes and report add/delete/modify # TODO: call out to dulwich.diff_tree somehow. tracked_changes = { 'add': [], 'delete': [], 'modify': [], } for change in index.changes_from_tree(r.object_store, r[b'HEAD'].tree): if not change[0][0]: tracked_changes['add'].append(change[0][1]) elif not change[0][1]: tracked_changes['delete'].append(change[0][0]) elif change[0][0] == change[0][1]: tracked_changes['modify'].append(change[0][0]) else: raise AssertionError('git mv ops not yet supported') return tracked_changes def daemon(path=".", address=None, port=None): """Run a daemon serving Git requests over TCP/IP. :param path: Path to the directory to serve. :param address: Optional address to listen on (defaults to ::) :param port: Optional port to listen on (defaults to TCP_GIT_PORT) """ # TODO(jelmer): Support git-daemon-export-ok and --export-all. backend = FileSystemBackend(path) server = TCPGitServer(backend, address, port) server.serve_forever() def web_daemon(path=".", address=None, port=None): """Run a daemon serving Git requests over HTTP. :param path: Path to the directory to serve :param address: Optional address to listen on (defaults to ::) :param port: Optional port to listen on (defaults to 80) """ from dulwich.web import ( make_wsgi_chain, make_server, WSGIRequestHandlerLogger, WSGIServerLogger) backend = FileSystemBackend(path) app = make_wsgi_chain(backend) server = make_server(address, port, app, handler_class=WSGIRequestHandlerLogger, server_class=WSGIServerLogger) server.serve_forever() def upload_pack(path=".", inf=None, outf=None): """Upload a pack file after negotiating its contents using smart protocol. :param path: Path to the repository :param inf: Input stream to communicate with client :param outf: Output stream to communicate with client """ if outf is None: outf = getattr(sys.stdout, 'buffer', sys.stdout) if inf is None: inf = getattr(sys.stdin, 'buffer', sys.stdin) backend = FileSystemBackend(path) def send_fn(data): outf.write(data) outf.flush() proto = Protocol(inf.read, send_fn) handler = UploadPackHandler(backend, [path], proto) # FIXME: Catch exceptions and write a single-line summary to outf. handler.handle() return 0 def receive_pack(path=".", inf=None, outf=None): """Receive a pack file after negotiating its contents using smart protocol. :param path: Path to the repository :param inf: Input stream to communicate with client :param outf: Output stream to communicate with client """ if outf is None: outf = getattr(sys.stdout, 'buffer', sys.stdout) if inf is None: inf = getattr(sys.stdin, 'buffer', sys.stdin) backend = FileSystemBackend(path) def send_fn(data): outf.write(data) outf.flush() proto = Protocol(inf.read, send_fn) handler = ReceivePackHandler(backend, [path], proto) # FIXME: Catch exceptions and write a single-line summary to outf. handler.handle() return 0 def branch_delete(repo, name): """Delete a branch. :param repo: Path to the repository :param name: Name of the branch """ with open_repo_closing(repo) as r: if isinstance(name, bytes): names = [name] elif isinstance(name, list): names = name else: raise TypeError("Unexpected branch name type %r" % name) for name in names: del r.refs[b"refs/heads/" + name] def branch_create(repo, name, objectish=None, force=False): """Create a branch. :param repo: Path to the repository :param name: Name of the new branch :param objectish: Target object to point new branch at (defaults to HEAD) :param force: Force creation of branch, even if it already exists """ with open_repo_closing(repo) as r: if isinstance(name, bytes): names = [name] elif isinstance(name, list): names = name else: raise TypeError("Unexpected branch name type %r" % name) if objectish is None: objectish = "HEAD" object = parse_object(r, objectish) refname = b"refs/heads/" + name if refname in r.refs and not force: raise KeyError("Branch with name %s already exists." % name) r.refs[refname] = object.id def branch_list(repo): """List all branches. :param repo: Path to the repository """ with open_repo_closing(repo) as r: return r.refs.keys(base=b"refs/heads/") def fetch(repo, remote_location, outstream=sys.stdout, errstream=sys.stderr): """Fetch objects from a remote server. :param repo: Path to the repository :param remote_location: String identifying a remote server :param outstream: Output stream (defaults to stdout) :param errstream: Error stream (defaults to stderr) :return: Dictionary with refs on the remote """ with open_repo_closing(repo) as r: client, path = get_transport_and_path(remote_location) remote_refs = client.fetch(path, r, progress=errstream.write) return remote_refs def ls_remote(remote): client, host_path = get_transport_and_path(remote) return client.get_refs(encode_path(host_path)) + + +def repack(repo): + """Repack loose files in a repository. + + Currently this only packs loose objects. + + :param repo: Path to the repository + """ + with open_repo_closing(repo) as r: + r.object_store.pack_loose_objects() diff --git a/dulwich/tests/test_porcelain.py b/dulwich/tests/test_porcelain.py index 5aece7a7..8aa0a5e3 100644 --- a/dulwich/tests/test_porcelain.py +++ b/dulwich/tests/test_porcelain.py @@ -1,742 +1,755 @@ # test_porcelain.py -- porcelain tests # Copyright (C) 2013 Jelmer Vernooij # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; version 2 # of the License or (at your option) a later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. """Tests for dulwich.porcelain.""" from contextlib import closing from io import BytesIO try: from StringIO import StringIO except ImportError: from io import StringIO import os import shutil import tarfile import tempfile from dulwich import porcelain from dulwich.diff_tree import tree_changes from dulwich.objects import ( Blob, Tag, Tree, ) from dulwich.repo import Repo from dulwich.tests import ( TestCase, ) from dulwich.tests.compat.utils import require_git_version from dulwich.tests.utils import ( build_commit_graph, make_object, ) class PorcelainTestCase(TestCase): def setUp(self): super(PorcelainTestCase, self).setUp() repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) self.repo = Repo.init(repo_dir) def tearDown(self): super(PorcelainTestCase, self).tearDown() self.repo.close() class ArchiveTests(PorcelainTestCase): """Tests for the archive command.""" def test_simple(self): # TODO(jelmer): Remove this once dulwich has its own implementation of archive. require_git_version((1, 5, 0)) c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/master"] = c3.id out = BytesIO() err = BytesIO() porcelain.archive(self.repo.path, b"refs/heads/master", outstream=out, errstream=err) self.assertEqual(b"", err.getvalue()) tf = tarfile.TarFile(fileobj=out) self.addCleanup(tf.close) self.assertEqual([], tf.getnames()) class UpdateServerInfoTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id porcelain.update_server_info(self.repo.path) self.assertTrue(os.path.exists(os.path.join(self.repo.controldir(), 'info', 'refs'))) class CommitTests(PorcelainTestCase): def test_custom_author(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"refs/heads/foo"] = c3.id sha = porcelain.commit(self.repo.path, message=b"Some message", author=b"Joe ", committer=b"Bob ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class CloneTests(PorcelainTestCase): def test_simple_local(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone(self.repo.path, target_path, checkout=False, errstream=errstream) self.assertEqual(r.path, target_path) self.assertEqual(Repo(target_path).head(), c3.id) self.assertTrue(b'f1' not in os.listdir(target_path)) self.assertTrue(b'f2' not in os.listdir(target_path)) def test_simple_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) with closing(porcelain.clone(self.repo.path, target_path, checkout=True, errstream=errstream)) as r: self.assertEqual(r.path, target_path) with closing(Repo(target_path)) as r: self.assertEqual(r.head(), c3.id) self.assertTrue('f1' in os.listdir(target_path)) self.assertTrue('f2' in os.listdir(target_path)) def test_bare_local_with_checkout(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1], [2, 1], [3, 1, 2]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)], 2: [(b'f1', f1_1), (b'f2', f1_1)], 3: [(b'f1', f1_1), (b'f2', f1_1)], } c1, c2, c3 = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c3.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) r = porcelain.clone(self.repo.path, target_path, bare=True, errstream=errstream) self.assertEqual(r.path, target_path) self.assertEqual(Repo(target_path).head(), c3.id) self.assertFalse(b'f1' in os.listdir(target_path)) self.assertFalse(b'f2' in os.listdir(target_path)) def test_no_checkout_with_bare(self): f1_1 = make_object(Blob, data=b'f1') commit_spec = [[1]] trees = {1: [(b'f1', f1_1), (b'f2', f1_1)]} (c1, ) = build_commit_graph(self.repo.object_store, commit_spec, trees) self.repo.refs[b"refs/heads/master"] = c1.id target_path = tempfile.mkdtemp() errstream = BytesIO() self.addCleanup(shutil.rmtree, target_path) self.assertRaises(ValueError, porcelain.clone, self.repo.path, target_path, checkout=True, bare=True, errstream=errstream) class InitTests(TestCase): def test_non_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir) def test_bare(self): repo_dir = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, repo_dir) porcelain.init(repo_dir, bare=True) class AddTests(PorcelainTestCase): def test_add_default_paths(self): # create a file for initial commit with open(os.path.join(self.repo.path, 'blah'), 'w') as f: f.write("\n") porcelain.add(repo=self.repo.path, paths=['blah']) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Add a second test file and a file in a directory with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("\n") os.mkdir(os.path.join(self.repo.path, 'adir')) with open(os.path.join(self.repo.path, 'adir', 'afile'), 'w') as f: f.write("\n") porcelain.add(self.repo.path) # Check that foo was added and nothing in .git was modified index = self.repo.open_index() self.assertEqual(sorted(index), [b'adir/afile', b'blah', b'foo']) def test_add_file(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=["foo"]) class RemoveTests(PorcelainTestCase): def test_remove_file(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=["foo"]) porcelain.rm(self.repo.path, paths=["foo"]) class LogTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream) self.assertEqual(3, outstream.getvalue().count("-" * 50)) def test_max_entries(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.log(self.repo.path, outstream=outstream, max_entries=1) self.assertEqual(1, outstream.getvalue().count("-" * 50)) class ShowTests(PorcelainTestCase): def test_nolist(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=c3.id, outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = StringIO() porcelain.show(self.repo.path, objects=[c3.id], outstream=outstream) self.assertTrue(outstream.getvalue().startswith("-" * 50)) def test_blob(self): b = Blob.from_string(b"The Foo\n") self.repo.object_store.add_object(b) outstream = StringIO() porcelain.show(self.repo.path, objects=[b.id], outstream=outstream) self.assertEqual(outstream.getvalue(), "The Foo\n") class SymbolicRefTests(PorcelainTestCase): def test_set_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id self.assertRaises(ValueError, porcelain.symbolic_ref, self.repo.path, b'foobar') def test_set_force_wrong_symbolic_ref(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'force_foobar', force=True) #test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/force_foobar\n') def test_set_symbolic_ref(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.symbolic_ref(self.repo.path, b'master') def test_set_symbolic_ref_other_than_master(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]], attrs=dict(refs='develop')) self.repo.refs[b"HEAD"] = c3.id self.repo.refs[b"refs/heads/develop"] = c3.id porcelain.symbolic_ref(self.repo.path, b'develop') #test if we actually changed the file with self.repo.get_named_file('HEAD') as f: new_ref = f.read() self.assertEqual(new_ref, b'ref: refs/heads/develop\n') class DiffTreeTests(PorcelainTestCase): def test_empty(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id outstream = BytesIO() porcelain.diff_tree(self.repo.path, c2.tree, c3.tree, outstream=outstream) self.assertEqual(outstream.getvalue(), b"") class CommitTreeTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) b = Blob() b.data = b"foo the bar" t = Tree() t.add(b"somename", 0o100644, b.id) self.repo.object_store.add_object(t) self.repo.object_store.add_object(b) sha = porcelain.commit_tree( self.repo.path, t.id, message=b"Withcommit.", author=b"Joe ", committer=b"Jane ") self.assertTrue(isinstance(sha, bytes)) self.assertEqual(len(sha), 40) class RevListTests(PorcelainTestCase): def test_simple(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) outstream = BytesIO() porcelain.rev_list( self.repo.path, [c3.id], outstream=outstream) self.assertEqual( c3.id + b"\n" + c2.id + b"\n" + c1.id + b"\n", outstream.getvalue()) class TagCreateTests(PorcelainTestCase): def test_annotated(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", b'foo ', b'bar', annotated=True) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) tag = self.repo[b'refs/tags/tryme'] self.assertTrue(isinstance(tag, Tag)) self.assertEqual(b"foo ", tag.tagger) self.assertEqual(b"bar", tag.message) def test_unannotated(self): c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1, 2]]) self.repo.refs[b"HEAD"] = c3.id porcelain.tag_create(self.repo.path, b"tryme", annotated=False) tags = self.repo.refs.as_dict(b"refs/tags") self.assertEqual(list(tags.keys()), [b"tryme"]) self.repo[b'refs/tags/tryme'] self.assertEqual(list(tags.values()), [self.repo.head()]) class TagListTests(PorcelainTestCase): def test_empty(self): tags = porcelain.tag_list(self.repo.path) self.assertEqual([], tags) def test_simple(self): self.repo.refs[b"refs/tags/foo"] = b"aa" * 20 self.repo.refs[b"refs/tags/bar/bla"] = b"bb" * 20 tags = porcelain.tag_list(self.repo.path) self.assertEqual([b"bar/bla", b"foo"], tags) class TagDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.tag_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.tag_list(self.repo)) porcelain.tag_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.tag_list(self.repo)) class ResetTests(PorcelainTestCase): def test_hard_head(self): with open(os.path.join(self.repo.path, 'foo'), 'w') as f: f.write("BAR") porcelain.add(self.repo.path, paths=["foo"]) porcelain.commit(self.repo.path, message=b"Some message", committer=b"Jane ", author=b"John ") with open(os.path.join(self.repo.path, 'foo'), 'wb') as f: f.write(b"OOH") porcelain.reset(self.repo, "hard", b"HEAD") index = self.repo.open_index() changes = list(tree_changes(self.repo, index.commit(self.repo.object_store), self.repo[b'HEAD'].tree)) self.assertEqual([], changes) class PushTests(PorcelainTestCase): def test_simple(self): """ Basic test of porcelain push where self.repo is the remote. First clone the remote, commit a file to the clone, then push the changes back to the remote. """ outstream = BytesIO() errstream = BytesIO() porcelain.commit(repo=self.repo.path, message=b'init', author=b'', committer=b'') # Setup target repo cloned from temp test repo clone_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, clone_path) target_repo = porcelain.clone(self.repo.path, target=clone_path, errstream=errstream) target_repo.close() # create a second file to be pushed back to origin handle, fullpath = tempfile.mkstemp(dir=clone_path) os.close(handle) porcelain.add(repo=clone_path, paths=[os.path.basename(fullpath)]) porcelain.commit(repo=clone_path, message=b'push', author=b'', committer=b'') # Setup a non-checked out branch in the remote refs_path = b"refs/heads/foo" self.repo.refs[refs_path] = self.repo[b'HEAD'].id # Push to the remote porcelain.push(clone_path, self.repo.path, b"HEAD:" + refs_path, outstream=outstream, errstream=errstream) # Check that the target and source with closing(Repo(clone_path)) as r_clone: self.assertEqual(r_clone[b'HEAD'].id, self.repo[refs_path].id) # Get the change in the target repo corresponding to the add # this will be in the foo branch. change = list(tree_changes(self.repo, self.repo[b'HEAD'].tree, self.repo[b'refs/heads/foo'].tree))[0] self.assertEqual(os.path.basename(fullpath), change.new.path.decode('ascii')) class PullTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) target_repo.close() # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2', committer=b'test2') self.assertTrue(b'refs/heads/master' in self.repo.refs) self.assertTrue(b'refs/heads/master' in target_repo.refs) # Pull changes into the cloned repo porcelain.pull(target_path, self.repo.path, b'refs/heads/master', outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with closing(Repo(target_path)) as r: self.assertEqual(r[b'HEAD'].id, self.repo[b'HEAD'].id) class StatusTests(PorcelainTestCase): def test_status(self): """Integration test for `status` functionality.""" # Commit a dummy file then modify it fullpath = os.path.join(self.repo.path, 'foo') with open(fullpath, 'w') as f: f.write('origstuff') porcelain.add(repo=self.repo.path, paths=['foo']) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') # modify access and modify time of path os.utime(fullpath, (0, 0)) with open(fullpath, 'wb') as f: f.write(b'stuff') # Make a dummy file and stage it filename_add = 'bar' fullpath = os.path.join(self.repo.path, filename_add) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename_add) results = porcelain.status(self.repo) self.assertEqual(results.staged['add'][0], filename_add.encode('ascii')) self.assertEqual(results.unstaged, [b'foo']) def test_get_tree_changes_add(self): """Unit test for get_tree_changes add.""" # Make a dummy file, stage filename = 'bar' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') filename = 'foo' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['add'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 1) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_modify(self): """Unit test for get_tree_changes modify.""" # Make a dummy file, stage, commit, modify filename = 'foo' fullpath = os.path.join(self.repo.path, filename) with open(fullpath, 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') with open(fullpath, 'w') as f: f.write('otherstuff') porcelain.add(repo=self.repo.path, paths=filename) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['modify'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 1) self.assertEqual(len(changes['delete']), 0) def test_get_tree_changes_delete(self): """Unit test for get_tree_changes delete.""" # Make a dummy file, stage, commit, remove filename = 'foo' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test status', author=b'', committer=b'') porcelain.rm(repo=self.repo.path, paths=[filename]) changes = porcelain.get_tree_changes(self.repo.path) self.assertEqual(changes['delete'][0], filename.encode('ascii')) self.assertEqual(len(changes['add']), 0) self.assertEqual(len(changes['modify']), 0) self.assertEqual(len(changes['delete']), 1) # TODO(jelmer): Add test for dulwich.porcelain.daemon class UploadPackTests(PorcelainTestCase): """Tests for upload_pack.""" def test_upload_pack(self): outf = BytesIO() exitcode = porcelain.upload_pack(self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([b"0000"], outlines) self.assertEqual(0, exitcode) class ReceivePackTests(PorcelainTestCase): """Tests for receive_pack.""" def test_receive_pack(self): filename = 'foo' with open(os.path.join(self.repo.path, filename), 'w') as f: f.write('stuff') porcelain.add(repo=self.repo.path, paths=filename) self.repo.do_commit(message=b'test status', author=b'', committer=b'', author_timestamp=1402354300, commit_timestamp=1402354300, author_timezone=0, commit_timezone=0) outf = BytesIO() exitcode = porcelain.receive_pack(self.repo.path, BytesIO(b"0000"), outf) outlines = outf.getvalue().splitlines() self.assertEqual([ b'006d9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 HEAD\x00 report-status ' b'delete-refs ofs-delta side-band-64k no-done', b'003f9e65bdcf4a22cdd4f3700604a275cd2aaf146b23 refs/heads/master', b'0000'], outlines) self.assertEqual(0, exitcode) class BranchListTests(PorcelainTestCase): def test_standard(self): self.assertEqual(set([]), set(porcelain.branch_list(self.repo))) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchCreateTests(PorcelainTestCase): def test_branch_exists(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertRaises(KeyError, porcelain.branch_create, self.repo, b"foo") porcelain.branch_create(self.repo, b"foo", force=True) def test_new_branch(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b"foo") self.assertEqual( set([b"master", b"foo"]), set(porcelain.branch_list(self.repo))) class BranchDeleteTests(PorcelainTestCase): def test_simple(self): [c1] = build_commit_graph(self.repo.object_store, [[1]]) self.repo[b"HEAD"] = c1.id porcelain.branch_create(self.repo, b'foo') self.assertTrue(b"foo" in porcelain.branch_list(self.repo)) porcelain.branch_delete(self.repo, b'foo') self.assertFalse(b"foo" in porcelain.branch_list(self.repo)) class FetchTests(PorcelainTestCase): def test_simple(self): outstream = BytesIO() errstream = BytesIO() # create a file for initial commit handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test', author=b'test', committer=b'test') # Setup target repo target_path = tempfile.mkdtemp() self.addCleanup(shutil.rmtree, target_path) target_repo = porcelain.clone(self.repo.path, target=target_path, errstream=errstream) # create a second file to be pushed handle, fullpath = tempfile.mkstemp(dir=self.repo.path) os.close(handle) filename = os.path.basename(fullpath) porcelain.add(repo=self.repo.path, paths=filename) porcelain.commit(repo=self.repo.path, message=b'test2', author=b'test2', committer=b'test2') self.assertFalse(self.repo[b'HEAD'].id in target_repo) target_repo.close() # Fetch changes into the cloned repo porcelain.fetch(target_path, self.repo.path, outstream=outstream, errstream=errstream) # Check the target repo for pushed changes with closing(Repo(target_path)) as r: self.assertTrue(self.repo[b'HEAD'].id in r) + + +class RepackTests(PorcelainTestCase): + + def test_empty(self): + porcelain.repack(self.repo) + + def test_simple(self): + handle, fullpath = tempfile.mkstemp(dir=self.repo.path) + os.close(handle) + filename = os.path.basename(fullpath) + porcelain.add(repo=self.repo.path, paths=filename) + porcelain.repack(self.repo)