diff --git a/dulwich/client.py b/dulwich/client.py index 5a7cca15..9cd0e958 100644 --- a/dulwich/client.py +++ b/dulwich/client.py @@ -1,1895 +1,1896 @@ # client.py -- Implementation of the client side git protocols # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Client side support for the Git protocol. The Dulwich client supports the following capabilities: * thin-pack * multi_ack_detailed * multi_ack * side-band-64k * ofs-delta * quiet * report-status * delete-refs * shallow Known capabilities that are not supported: * no-progress * include-tag """ from contextlib import closing from io import BytesIO, BufferedReader import errno import os import select import socket import subprocess import sys from urllib.parse import ( quote as urlquote, unquote as urlunquote, urlparse, urljoin, urlunparse, urlunsplit, + urlunparse, ) import dulwich from dulwich.config import get_xdg_config_home_path from dulwich.errors import ( GitProtocolError, NotGitRepository, SendPackError, UpdateRefsError, ) from dulwich.protocol import ( HangupException, _RBUFSIZE, agent_string, capability_agent, extract_capability_names, CAPABILITY_AGENT, CAPABILITY_DELETE_REFS, CAPABILITY_INCLUDE_TAG, CAPABILITY_MULTI_ACK, CAPABILITY_MULTI_ACK_DETAILED, CAPABILITY_OFS_DELTA, CAPABILITY_QUIET, CAPABILITY_REPORT_STATUS, CAPABILITY_SHALLOW, CAPABILITY_SYMREF, CAPABILITY_SIDE_BAND_64K, CAPABILITY_THIN_PACK, CAPABILITIES_REF, KNOWN_RECEIVE_CAPABILITIES, KNOWN_UPLOAD_CAPABILITIES, COMMAND_DEEPEN, COMMAND_SHALLOW, COMMAND_UNSHALLOW, COMMAND_DONE, COMMAND_HAVE, COMMAND_WANT, SIDE_BAND_CHANNEL_DATA, SIDE_BAND_CHANNEL_PROGRESS, SIDE_BAND_CHANNEL_FATAL, PktLineParser, Protocol, ProtocolFile, TCP_GIT_PORT, ZERO_SHA, extract_capabilities, parse_capability, ) from dulwich.pack import ( write_pack_data, write_pack_objects, ) from dulwich.refs import ( read_info_refs, ANNOTATED_TAG_SUFFIX, ) class InvalidWants(Exception): """Invalid wants.""" def __init__(self, wants): Exception.__init__( self, "requested wants not in server provided refs: %r" % wants) def _fileno_can_read(fileno): """Check if a file descriptor is readable. """ return len(select.select([fileno], [], [], 0)[0]) > 0 def _win32_peek_avail(handle): """Wrapper around PeekNamedPipe to check how many bytes are available. """ from ctypes import byref, wintypes, windll c_avail = wintypes.DWORD() c_message = wintypes.DWORD() success = windll.kernel32.PeekNamedPipe( handle, None, 0, None, byref(c_avail), byref(c_message)) if not success: raise OSError(wintypes.GetLastError()) return c_avail.value COMMON_CAPABILITIES = [CAPABILITY_OFS_DELTA, CAPABILITY_SIDE_BAND_64K] UPLOAD_CAPABILITIES = ([CAPABILITY_THIN_PACK, CAPABILITY_MULTI_ACK, CAPABILITY_MULTI_ACK_DETAILED, CAPABILITY_SHALLOW] + COMMON_CAPABILITIES) RECEIVE_CAPABILITIES = ( [CAPABILITY_REPORT_STATUS, CAPABILITY_DELETE_REFS] + COMMON_CAPABILITIES) class ReportStatusParser(object): """Handle status as reported by servers with 'report-status' capability.""" def __init__(self): self._done = False self._pack_status = None self._ref_status_ok = True self._ref_statuses = [] def check(self): """Check if there were any errors and, if so, raise exceptions. Raises: SendPackError: Raised when the server could not unpack UpdateRefsError: Raised when refs could not be updated """ if self._pack_status not in (b'unpack ok', None): raise SendPackError(self._pack_status) if not self._ref_status_ok: ref_status = {} ok = set() for status in self._ref_statuses: if b' ' not in status: # malformed response, move on to the next one continue status, ref = status.split(b' ', 1) if status == b'ng': if b' ' in ref: ref, status = ref.split(b' ', 1) else: ok.add(ref) ref_status[ref] = status # TODO(jelmer): don't assume encoding of refs is ascii. raise UpdateRefsError(', '.join([ refname.decode('ascii') for refname in ref_status if refname not in ok]) + ' failed to update', ref_status=ref_status) def handle_packet(self, pkt): """Handle a packet. Raises: GitProtocolError: Raised when packets are received after a flush packet. """ if self._done: raise GitProtocolError("received more data after status report") if pkt is None: self._done = True return if self._pack_status is None: self._pack_status = pkt.strip() else: ref_status = pkt.strip() self._ref_statuses.append(ref_status) if not ref_status.startswith(b'ok '): self._ref_status_ok = False def read_pkt_refs(proto): server_capabilities = None refs = {} # Receive refs from server for pkt in proto.read_pkt_seq(): (sha, ref) = pkt.rstrip(b'\n').split(None, 1) if sha == b'ERR': raise GitProtocolError(ref.decode('utf-8', 'replace')) if server_capabilities is None: (ref, server_capabilities) = extract_capabilities(ref) refs[ref] = sha if len(refs) == 0: return {}, set([]) if refs == {CAPABILITIES_REF: ZERO_SHA}: refs = {} return refs, set(server_capabilities) class FetchPackResult(object): """Result of a fetch-pack operation. Attributes: refs: Dictionary with all remote refs symrefs: Dictionary with remote symrefs agent: User agent string """ _FORWARDED_ATTRS = [ 'clear', 'copy', 'fromkeys', 'get', 'has_key', 'items', 'iteritems', 'iterkeys', 'itervalues', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values', 'viewitems', 'viewkeys', 'viewvalues'] def __init__(self, refs, symrefs, agent, new_shallow=None, new_unshallow=None): self.refs = refs self.symrefs = symrefs self.agent = agent self.new_shallow = new_shallow self.new_unshallow = new_unshallow def _warn_deprecated(self): import warnings warnings.warn( "Use FetchPackResult.refs instead.", DeprecationWarning, stacklevel=3) def __eq__(self, other): if isinstance(other, dict): self._warn_deprecated() return (self.refs == other) return (self.refs == other.refs and self.symrefs == other.symrefs and self.agent == other.agent) def __contains__(self, name): self._warn_deprecated() return name in self.refs def __getitem__(self, name): self._warn_deprecated() return self.refs[name] def __len__(self): self._warn_deprecated() return len(self.refs) def __iter__(self): self._warn_deprecated() return iter(self.refs) def __getattribute__(self, name): if name in type(self)._FORWARDED_ATTRS: self._warn_deprecated() return getattr(self.refs, name) return super(FetchPackResult, self).__getattribute__(name) def __repr__(self): return "%s(%r, %r, %r)" % ( self.__class__.__name__, self.refs, self.symrefs, self.agent) def _read_shallow_updates(proto): new_shallow = set() new_unshallow = set() for pkt in proto.read_pkt_seq(): cmd, sha = pkt.split(b' ', 1) if cmd == COMMAND_SHALLOW: new_shallow.add(sha.strip()) elif cmd == COMMAND_UNSHALLOW: new_unshallow.add(sha.strip()) else: raise GitProtocolError('unknown command %s' % pkt) return (new_shallow, new_unshallow) # TODO(durin42): this doesn't correctly degrade if the server doesn't # support some capabilities. This should work properly with servers # that don't support multi_ack. class GitClient(object): """Git smart server client.""" def __init__(self, thin_packs=True, report_activity=None, quiet=False, include_tags=False): """Create a new GitClient instance. Args: thin_packs: Whether or not thin packs should be retrieved report_activity: Optional callback for reporting transport activity. include_tags: send annotated tags when sending the objects they point to """ self._report_activity = report_activity self._report_status_parser = None self._fetch_capabilities = set(UPLOAD_CAPABILITIES) self._fetch_capabilities.add(capability_agent()) self._send_capabilities = set(RECEIVE_CAPABILITIES) self._send_capabilities.add(capability_agent()) if quiet: self._send_capabilities.add(CAPABILITY_QUIET) if not thin_packs: self._fetch_capabilities.remove(CAPABILITY_THIN_PACK) if include_tags: self._fetch_capabilities.add(CAPABILITY_INCLUDE_TAG) def get_url(self, path): """Retrieves full url to given path. Args: path: Repository path (as string) Returns: Url to path (as string) """ raise NotImplementedError(self.get_url) @classmethod def from_parsedurl(cls, parsedurl, **kwargs): """Create an instance of this client from a urlparse.parsed object. Args: parsedurl: Result of urlparse() Returns: A `GitClient` object """ raise NotImplementedError(cls.from_parsedurl) def send_pack(self, path, update_refs, generate_pack_data, progress=None): """Upload a pack to a remote repository. Args: path: Repository path (as bytestring) update_refs: Function to determine changes to remote refs. Receive dict with existing remote refs, returns dict with changed refs (name -> sha, where sha=ZERO_SHA for deletions) generate_pack_data: Function that can return a tuple with number of objects and list of pack data to include progress: Optional progress function Returns: new_refs dictionary containing the changes that were made {refname: new_ref}, including deleted refs. Raises: SendPackError: if server rejects the pack data UpdateRefsError: if the server supports report-status and rejects ref updates """ raise NotImplementedError(self.send_pack) def fetch(self, path, target, determine_wants=None, progress=None, depth=None): """Fetch into a target repository. Args: path: Path to fetch from (as bytestring) target: Target repository to fetch into determine_wants: Optional function to determine what refs to fetch. Receives dictionary of name->sha, should return list of shas to fetch. Defaults to all shas. progress: Optional progress function depth: Depth to fetch at Returns: Dictionary with all remote refs (not just those fetched) """ if determine_wants is None: determine_wants = target.object_store.determine_wants_all if CAPABILITY_THIN_PACK in self._fetch_capabilities: # TODO(jelmer): Avoid reading entire file into memory and # only processing it after the whole file has been fetched. f = BytesIO() def commit(): if f.tell(): f.seek(0) target.object_store.add_thin_pack(f.read, None) def abort(): pass else: f, commit, abort = target.object_store.add_pack() try: result = self.fetch_pack( path, determine_wants, target.get_graph_walker(), f.write, progress=progress, depth=depth) except BaseException: abort() raise else: commit() target.update_shallow(result.new_shallow, result.new_unshallow) return result def fetch_pack(self, path, determine_wants, graph_walker, pack_data, progress=None, depth=None): """Retrieve a pack from a git smart server. Args: path: Remote path to fetch from determine_wants: Function determine what refs to fetch. Receives dictionary of name->sha, should return list of shas to fetch. graph_walker: Object with next() and ack(). pack_data: Callback called for each bit of data in the pack progress: Callback for progress reports (strings) depth: Shallow fetch depth Returns: FetchPackResult object """ raise NotImplementedError(self.fetch_pack) def get_refs(self, path): """Retrieve the current refs from a git smart server. Args: path: Path to the repo to fetch from. (as bytestring) Returns: """ raise NotImplementedError(self.get_refs) def _parse_status_report(self, proto): unpack = proto.read_pkt_line().strip() if unpack != b'unpack ok': st = True # flush remaining error data while st is not None: st = proto.read_pkt_line() raise SendPackError(unpack) statuses = [] errs = False ref_status = proto.read_pkt_line() while ref_status: ref_status = ref_status.strip() statuses.append(ref_status) if not ref_status.startswith(b'ok '): errs = True ref_status = proto.read_pkt_line() if errs: ref_status = {} ok = set() for status in statuses: if b' ' not in status: # malformed response, move on to the next one continue status, ref = status.split(b' ', 1) if status == b'ng': if b' ' in ref: ref, status = ref.split(b' ', 1) else: ok.add(ref) ref_status[ref] = status raise UpdateRefsError(', '.join([ refname for refname in ref_status if refname not in ok]) + b' failed to update', ref_status=ref_status) def _read_side_band64k_data(self, proto, channel_callbacks): """Read per-channel data. This requires the side-band-64k capability. Args: proto: Protocol object to read from channel_callbacks: Dictionary mapping channels to packet handlers to use. None for a callback discards channel data. """ for pkt in proto.read_pkt_seq(): channel = ord(pkt[:1]) pkt = pkt[1:] try: cb = channel_callbacks[channel] except KeyError: raise AssertionError('Invalid sideband channel %d' % channel) else: if cb is not None: cb(pkt) def _handle_receive_pack_head(self, proto, capabilities, old_refs, new_refs): """Handle the head of a 'git-receive-pack' request. Args: proto: Protocol object to read from capabilities: List of negotiated capabilities old_refs: Old refs, as received from the server new_refs: Refs to change Returns: have, want) tuple """ want = [] have = [x for x in old_refs.values() if not x == ZERO_SHA] sent_capabilities = False for refname in new_refs: if not isinstance(refname, bytes): raise TypeError('refname is not a bytestring: %r' % refname) old_sha1 = old_refs.get(refname, ZERO_SHA) if not isinstance(old_sha1, bytes): raise TypeError('old sha1 for %s is not a bytestring: %r' % (refname, old_sha1)) new_sha1 = new_refs.get(refname, ZERO_SHA) if not isinstance(new_sha1, bytes): raise TypeError('old sha1 for %s is not a bytestring %r' % (refname, new_sha1)) if old_sha1 != new_sha1: if sent_capabilities: proto.write_pkt_line(old_sha1 + b' ' + new_sha1 + b' ' + refname) else: proto.write_pkt_line( old_sha1 + b' ' + new_sha1 + b' ' + refname + b'\0' + b' '.join(sorted(capabilities))) sent_capabilities = True if new_sha1 not in have and new_sha1 != ZERO_SHA: want.append(new_sha1) proto.write_pkt_line(None) return (have, want) def _negotiate_receive_pack_capabilities(self, server_capabilities): negotiated_capabilities = ( self._send_capabilities & server_capabilities) unknown_capabilities = ( # noqa: F841 extract_capability_names(server_capabilities) - KNOWN_RECEIVE_CAPABILITIES) # TODO(jelmer): warn about unknown capabilities return negotiated_capabilities def _handle_receive_pack_tail(self, proto, capabilities, progress=None): """Handle the tail of a 'git-receive-pack' request. Args: proto: Protocol object to read from capabilities: List of negotiated capabilities progress: Optional progress reporting function Returns: """ if CAPABILITY_SIDE_BAND_64K in capabilities: if progress is None: def progress(x): pass channel_callbacks = {2: progress} if CAPABILITY_REPORT_STATUS in capabilities: channel_callbacks[1] = PktLineParser( self._report_status_parser.handle_packet).parse self._read_side_band64k_data(proto, channel_callbacks) else: if CAPABILITY_REPORT_STATUS in capabilities: for pkt in proto.read_pkt_seq(): self._report_status_parser.handle_packet(pkt) if self._report_status_parser is not None: self._report_status_parser.check() def _negotiate_upload_pack_capabilities(self, server_capabilities): unknown_capabilities = ( # noqa: F841 extract_capability_names(server_capabilities) - KNOWN_UPLOAD_CAPABILITIES) # TODO(jelmer): warn about unknown capabilities symrefs = {} agent = None for capability in server_capabilities: k, v = parse_capability(capability) if k == CAPABILITY_SYMREF: (src, dst) = v.split(b':', 1) symrefs[src] = dst if k == CAPABILITY_AGENT: agent = v negotiated_capabilities = ( self._fetch_capabilities & server_capabilities) return (negotiated_capabilities, symrefs, agent) def _handle_upload_pack_head(self, proto, capabilities, graph_walker, wants, can_read, depth): """Handle the head of a 'git-upload-pack' request. Args: proto: Protocol object to read from capabilities: List of negotiated capabilities graph_walker: GraphWalker instance to call .ack() on wants: List of commits to fetch can_read: function that returns a boolean that indicates whether there is extra graph data to read on proto depth: Depth for request Returns: """ assert isinstance(wants, list) and isinstance(wants[0], bytes) proto.write_pkt_line(COMMAND_WANT + b' ' + wants[0] + b' ' + b' '.join(sorted(capabilities)) + b'\n') for want in wants[1:]: proto.write_pkt_line(COMMAND_WANT + b' ' + want + b'\n') if depth not in (0, None) or getattr(graph_walker, 'shallow', None): if CAPABILITY_SHALLOW not in capabilities: raise GitProtocolError( "server does not support shallow capability required for " "depth") for sha in graph_walker.shallow: proto.write_pkt_line(COMMAND_SHALLOW + b' ' + sha + b'\n') if depth is not None: proto.write_pkt_line(COMMAND_DEEPEN + b' ' + str(depth).encode('ascii') + b'\n') proto.write_pkt_line(None) if can_read is not None: (new_shallow, new_unshallow) = _read_shallow_updates(proto) else: new_shallow = new_unshallow = None else: new_shallow = new_unshallow = set() proto.write_pkt_line(None) have = next(graph_walker) while have: proto.write_pkt_line(COMMAND_HAVE + b' ' + have + b'\n') if can_read is not None and can_read(): pkt = proto.read_pkt_line() parts = pkt.rstrip(b'\n').split(b' ') if parts[0] == b'ACK': graph_walker.ack(parts[1]) if parts[2] in (b'continue', b'common'): pass elif parts[2] == b'ready': break else: raise AssertionError( "%s not in ('continue', 'ready', 'common)" % parts[2]) have = next(graph_walker) proto.write_pkt_line(COMMAND_DONE + b'\n') return (new_shallow, new_unshallow) def _handle_upload_pack_tail(self, proto, capabilities, graph_walker, pack_data, progress=None, rbufsize=_RBUFSIZE): """Handle the tail of a 'git-upload-pack' request. Args: proto: Protocol object to read from capabilities: List of negotiated capabilities graph_walker: GraphWalker instance to call .ack() on pack_data: Function to call with pack data progress: Optional progress reporting function rbufsize: Read buffer size Returns: """ pkt = proto.read_pkt_line() while pkt: parts = pkt.rstrip(b'\n').split(b' ') if parts[0] == b'ACK': graph_walker.ack(parts[1]) if len(parts) < 3 or parts[2] not in ( b'ready', b'continue', b'common'): break pkt = proto.read_pkt_line() if CAPABILITY_SIDE_BAND_64K in capabilities: if progress is None: # Just ignore progress data def progress(x): pass self._read_side_band64k_data(proto, { SIDE_BAND_CHANNEL_DATA: pack_data, SIDE_BAND_CHANNEL_PROGRESS: progress} ) else: while True: data = proto.read(rbufsize) if data == b"": break pack_data(data) def check_wants(wants, refs): """Check that a set of wants is valid. Args: wants: Set of object SHAs to fetch refs: Refs dictionary to check against Returns: """ missing = set(wants) - { v for (k, v) in refs.items() if not k.endswith(ANNOTATED_TAG_SUFFIX)} if missing: raise InvalidWants(missing) def remote_error_from_stderr(stderr): if stderr is None: return HangupException() for l in stderr.readlines(): if l.startswith(b'ERROR: '): return GitProtocolError( l[len(b'ERROR: '):].decode('utf-8', 'replace')) return GitProtocolError(l.decode('utf-8', 'replace')) return HangupException() class TraditionalGitClient(GitClient): """Traditional Git client.""" DEFAULT_ENCODING = 'utf-8' def __init__(self, path_encoding=DEFAULT_ENCODING, **kwargs): self._remote_path_encoding = path_encoding super(TraditionalGitClient, self).__init__(**kwargs) def _connect(self, cmd, path): """Create a connection to the server. This method is abstract - concrete implementations should implement their own variant which connects to the server and returns an initialized Protocol object with the service ready for use and a can_read function which may be used to see if reads would block. Args: cmd: The git service name to which we should connect. path: The path we should pass to the service. (as bytestirng) """ raise NotImplementedError() def send_pack(self, path, update_refs, generate_pack_data, progress=None): """Upload a pack to a remote repository. Args: path: Repository path (as bytestring) update_refs: Function to determine changes to remote refs. Receive dict with existing remote refs, returns dict with changed refs (name -> sha, where sha=ZERO_SHA for deletions) generate_pack_data: Function that can return a tuple with number of objects and pack data to upload. progress: Optional callback called with progress updates Returns: new_refs dictionary containing the changes that were made {refname: new_ref}, including deleted refs. Raises: SendPackError: if server rejects the pack data UpdateRefsError: if the server supports report-status and rejects ref updates """ proto, unused_can_read, stderr = self._connect(b'receive-pack', path) with proto: try: old_refs, server_capabilities = read_pkt_refs(proto) except HangupException: raise remote_error_from_stderr(stderr) negotiated_capabilities = \ self._negotiate_receive_pack_capabilities(server_capabilities) if CAPABILITY_REPORT_STATUS in negotiated_capabilities: self._report_status_parser = ReportStatusParser() report_status_parser = self._report_status_parser try: new_refs = orig_new_refs = update_refs(dict(old_refs)) except BaseException: proto.write_pkt_line(None) raise if CAPABILITY_DELETE_REFS not in server_capabilities: # Server does not support deletions. Fail later. new_refs = dict(orig_new_refs) for ref, sha in orig_new_refs.items(): if sha == ZERO_SHA: if CAPABILITY_REPORT_STATUS in negotiated_capabilities: report_status_parser._ref_statuses.append( b'ng ' + sha + b' remote does not support deleting refs') report_status_parser._ref_status_ok = False del new_refs[ref] if new_refs is None: proto.write_pkt_line(None) return old_refs if len(new_refs) == 0 and len(orig_new_refs): # NOOP - Original new refs filtered out by policy proto.write_pkt_line(None) if report_status_parser is not None: report_status_parser.check() return old_refs (have, want) = self._handle_receive_pack_head( proto, negotiated_capabilities, old_refs, new_refs) if (not want and set(new_refs.items()).issubset(set(old_refs.items()))): return new_refs pack_data_count, pack_data = generate_pack_data( have, want, ofs_delta=(CAPABILITY_OFS_DELTA in negotiated_capabilities)) dowrite = bool(pack_data_count) dowrite = dowrite or any(old_refs.get(ref) != sha for (ref, sha) in new_refs.items() if sha != ZERO_SHA) if dowrite: write_pack_data(proto.write_file(), pack_data_count, pack_data) self._handle_receive_pack_tail( proto, negotiated_capabilities, progress) return new_refs def fetch_pack(self, path, determine_wants, graph_walker, pack_data, progress=None, depth=None): """Retrieve a pack from a git smart server. Args: path: Remote path to fetch from determine_wants: Function determine what refs to fetch. Receives dictionary of name->sha, should return list of shas to fetch. graph_walker: Object with next() and ack(). pack_data: Callback called for each bit of data in the pack progress: Callback for progress reports (strings) depth: Shallow fetch depth Returns: FetchPackResult object """ proto, can_read, stderr = self._connect(b'upload-pack', path) with proto: try: refs, server_capabilities = read_pkt_refs(proto) except HangupException: raise remote_error_from_stderr(stderr) negotiated_capabilities, symrefs, agent = ( self._negotiate_upload_pack_capabilities( server_capabilities)) if refs is None: proto.write_pkt_line(None) return FetchPackResult(refs, symrefs, agent) try: wants = determine_wants(refs) except BaseException: proto.write_pkt_line(None) raise if wants is not None: wants = [cid for cid in wants if cid != ZERO_SHA] if not wants: proto.write_pkt_line(None) return FetchPackResult(refs, symrefs, agent) (new_shallow, new_unshallow) = self._handle_upload_pack_head( proto, negotiated_capabilities, graph_walker, wants, can_read, depth=depth) self._handle_upload_pack_tail( proto, negotiated_capabilities, graph_walker, pack_data, progress) return FetchPackResult( refs, symrefs, agent, new_shallow, new_unshallow) def get_refs(self, path): """Retrieve the current refs from a git smart server. """ # stock `git ls-remote` uses upload-pack proto, _, stderr = self._connect(b'upload-pack', path) with proto: try: refs, _ = read_pkt_refs(proto) except HangupException: raise remote_error_from_stderr(stderr) proto.write_pkt_line(None) return refs def archive(self, path, committish, write_data, progress=None, write_error=None, format=None, subdirs=None, prefix=None): proto, can_read, stderr = self._connect(b'upload-archive', path) with proto: if format is not None: proto.write_pkt_line(b"argument --format=" + format) proto.write_pkt_line(b"argument " + committish) if subdirs is not None: for subdir in subdirs: proto.write_pkt_line(b"argument " + subdir) if prefix is not None: proto.write_pkt_line(b"argument --prefix=" + prefix) proto.write_pkt_line(None) try: pkt = proto.read_pkt_line() except HangupException: raise remote_error_from_stderr(stderr) if pkt == b"NACK\n": return elif pkt == b"ACK\n": pass elif pkt.startswith(b"ERR "): raise GitProtocolError( pkt[4:].rstrip(b"\n").decode('utf-8', 'replace')) else: raise AssertionError("invalid response %r" % pkt) ret = proto.read_pkt_line() if ret is not None: raise AssertionError("expected pkt tail") self._read_side_band64k_data(proto, { SIDE_BAND_CHANNEL_DATA: write_data, SIDE_BAND_CHANNEL_PROGRESS: progress, SIDE_BAND_CHANNEL_FATAL: write_error}) class TCPGitClient(TraditionalGitClient): """A Git Client that works over TCP directly (i.e. git://).""" def __init__(self, host, port=None, **kwargs): if port is None: port = TCP_GIT_PORT self._host = host self._port = port super(TCPGitClient, self).__init__(**kwargs) @classmethod def from_parsedurl(cls, parsedurl, **kwargs): return cls(parsedurl.hostname, port=parsedurl.port, **kwargs) def get_url(self, path): netloc = self._host if self._port is not None and self._port != TCP_GIT_PORT: netloc += ":%d" % self._port return urlunsplit(("git", netloc, path, '', '')) def _connect(self, cmd, path): if not isinstance(cmd, bytes): raise TypeError(cmd) if not isinstance(path, bytes): path = path.encode(self._remote_path_encoding) sockaddrs = socket.getaddrinfo( self._host, self._port, socket.AF_UNSPEC, socket.SOCK_STREAM) s = None err = socket.error("no address found for %s" % self._host) for (family, socktype, proto, canonname, sockaddr) in sockaddrs: s = socket.socket(family, socktype, proto) s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) try: s.connect(sockaddr) break except socket.error as e: err = e if s is not None: s.close() s = None if s is None: raise err # -1 means system default buffering rfile = s.makefile('rb', -1) # 0 means unbuffered wfile = s.makefile('wb', 0) def close(): rfile.close() wfile.close() s.close() proto = Protocol(rfile.read, wfile.write, close, report_activity=self._report_activity) if path.startswith(b"/~"): path = path[1:] # TODO(jelmer): Alternative to ascii? proto.send_cmd( b'git-' + cmd, path, b'host=' + self._host.encode('ascii')) return proto, lambda: _fileno_can_read(s), None class SubprocessWrapper(object): """A socket-like object that talks to a subprocess via pipes.""" def __init__(self, proc): self.proc = proc self.read = BufferedReader(proc.stdout).read self.write = proc.stdin.write @property def stderr(self): return self.proc.stderr def can_read(self): if sys.platform == 'win32': from msvcrt import get_osfhandle handle = get_osfhandle(self.proc.stdout.fileno()) return _win32_peek_avail(handle) != 0 else: return _fileno_can_read(self.proc.stdout.fileno()) def close(self): self.proc.stdin.close() self.proc.stdout.close() if self.proc.stderr: self.proc.stderr.close() self.proc.wait() def find_git_command(): """Find command to run for system Git (usually C Git).""" if sys.platform == 'win32': # support .exe, .bat and .cmd try: # to avoid overhead import win32api except ImportError: # run through cmd.exe with some overhead return ['cmd', '/c', 'git'] else: status, git = win32api.FindExecutable('git') return [git] else: return ['git'] class SubprocessGitClient(TraditionalGitClient): """Git client that talks to a server using a subprocess.""" @classmethod def from_parsedurl(cls, parsedurl, **kwargs): return cls(**kwargs) git_command = None def _connect(self, service, path): if not isinstance(service, bytes): raise TypeError(service) if isinstance(path, bytes): path = path.decode(self._remote_path_encoding) if self.git_command is None: git_command = find_git_command() argv = git_command + [service.decode('ascii'), path] p = subprocess.Popen(argv, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) pw = SubprocessWrapper(p) return (Protocol(pw.read, pw.write, pw.close, report_activity=self._report_activity), pw.can_read, p.stderr) class LocalGitClient(GitClient): """Git Client that just uses a local Repo.""" def __init__(self, thin_packs=True, report_activity=None, config=None): """Create a new LocalGitClient instance. Args: thin_packs: Whether or not thin packs should be retrieved report_activity: Optional callback for reporting transport activity. """ self._report_activity = report_activity # Ignore the thin_packs argument def get_url(self, path): return urlunsplit(('file', '', path, '', '')) @classmethod def from_parsedurl(cls, parsedurl, **kwargs): return cls(**kwargs) @classmethod def _open_repo(cls, path): from dulwich.repo import Repo if not isinstance(path, str): path = path.decode(sys.getfilesystemencoding()) return closing(Repo(path)) def send_pack(self, path, update_refs, generate_pack_data, progress=None): """Upload a pack to a remote repository. Args: path: Repository path (as bytestring) update_refs: Function to determine changes to remote refs. Receive dict with existing remote refs, returns dict with changed refs (name -> sha, where sha=ZERO_SHA for deletions) with number of items and pack data to upload. progress: Optional progress function Returns: new_refs dictionary containing the changes that were made {refname: new_ref}, including deleted refs. Raises: SendPackError: if server rejects the pack data UpdateRefsError: if the server supports report-status and rejects ref updates """ if not progress: def progress(x): pass with self._open_repo(path) as target: old_refs = target.get_refs() new_refs = update_refs(dict(old_refs)) have = [sha1 for sha1 in old_refs.values() if sha1 != ZERO_SHA] want = [] for refname, new_sha1 in new_refs.items(): if (new_sha1 not in have and new_sha1 not in want and new_sha1 != ZERO_SHA): want.append(new_sha1) if (not want and set(new_refs.items()).issubset(set(old_refs.items()))): return new_refs target.object_store.add_pack_data( *generate_pack_data(have, want, ofs_delta=True)) for refname, new_sha1 in new_refs.items(): old_sha1 = old_refs.get(refname, ZERO_SHA) if new_sha1 != ZERO_SHA: if not target.refs.set_if_equals( refname, old_sha1, new_sha1): progress('unable to set %s to %s' % (refname, new_sha1)) else: if not target.refs.remove_if_equals(refname, old_sha1): progress('unable to remove %s' % refname) return new_refs def fetch(self, path, target, determine_wants=None, progress=None, depth=None): """Fetch into a target repository. Args: path: Path to fetch from (as bytestring) target: Target repository to fetch into determine_wants: Optional function determine what refs to fetch. Receives dictionary of name->sha, should return list of shas to fetch. Defaults to all shas. progress: Optional progress function depth: Shallow fetch depth Returns: FetchPackResult object """ with self._open_repo(path) as r: refs = r.fetch(target, determine_wants=determine_wants, progress=progress, depth=depth) return FetchPackResult(refs, r.refs.get_symrefs(), agent_string()) def fetch_pack(self, path, determine_wants, graph_walker, pack_data, progress=None, depth=None): """Retrieve a pack from a git smart server. Args: path: Remote path to fetch from determine_wants: Function determine what refs to fetch. Receives dictionary of name->sha, should return list of shas to fetch. graph_walker: Object with next() and ack(). pack_data: Callback called for each bit of data in the pack progress: Callback for progress reports (strings) depth: Shallow fetch depth Returns: FetchPackResult object """ with self._open_repo(path) as r: objects_iter = r.fetch_objects( determine_wants, graph_walker, progress=progress, depth=depth) symrefs = r.refs.get_symrefs() agent = agent_string() # Did the process short-circuit (e.g. in a stateless RPC call)? # Note that the client still expects a 0-object pack in most cases. if objects_iter is None: return FetchPackResult(None, symrefs, agent) protocol = ProtocolFile(None, pack_data) write_pack_objects(protocol, objects_iter) return FetchPackResult(r.get_refs(), symrefs, agent) def get_refs(self, path): """Retrieve the current refs from a git smart server. """ with self._open_repo(path) as target: return target.get_refs() # What Git client to use for local access default_local_git_client_cls = LocalGitClient class SSHVendor(object): """A client side SSH implementation.""" def connect_ssh(self, host, command, username=None, port=None, password=None, key_filename=None): # This function was deprecated in 0.9.1 import warnings warnings.warn( "SSHVendor.connect_ssh has been renamed to SSHVendor.run_command", DeprecationWarning) return self.run_command(host, command, username=username, port=port, password=password, key_filename=key_filename) def run_command(self, host, command, username=None, port=None, password=None, key_filename=None): """Connect to an SSH server. Run a command remotely and return a file-like object for interaction with the remote command. Args: host: Host name command: Command to run (as argv array) username: Optional ame of user to log in as port: Optional SSH port to use password: Optional ssh password for login or private key key_filename: Optional path to private keyfile Returns: """ raise NotImplementedError(self.run_command) class StrangeHostname(Exception): """Refusing to connect to strange SSH hostname.""" def __init__(self, hostname): super(StrangeHostname, self).__init__(hostname) class SubprocessSSHVendor(SSHVendor): """SSH vendor that shells out to the local 'ssh' command.""" def run_command(self, host, command, username=None, port=None, password=None, key_filename=None): if password is not None: raise NotImplementedError( "Setting password not supported by SubprocessSSHVendor.") args = ['ssh', '-x'] if port: args.extend(['-p', str(port)]) if key_filename: args.extend(['-i', str(key_filename)]) if username: host = '%s@%s' % (username, host) if host.startswith('-'): raise StrangeHostname(hostname=host) args.append(host) proc = subprocess.Popen(args + [command], bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return SubprocessWrapper(proc) class PLinkSSHVendor(SSHVendor): """SSH vendor that shells out to the local 'plink' command.""" def run_command(self, host, command, username=None, port=None, password=None, key_filename=None): if sys.platform == 'win32': args = ['plink.exe', '-ssh'] else: args = ['plink', '-ssh'] if password is not None: import warnings warnings.warn( "Invoking PLink with a password exposes the password in the " "process list.") args.extend(['-pw', str(password)]) if port: args.extend(['-P', str(port)]) if key_filename: args.extend(['-i', str(key_filename)]) if username: host = '%s@%s' % (username, host) if host.startswith('-'): raise StrangeHostname(hostname=host) args.append(host) proc = subprocess.Popen(args + [command], bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return SubprocessWrapper(proc) def ParamikoSSHVendor(**kwargs): import warnings warnings.warn( "ParamikoSSHVendor has been moved to dulwich.contrib.paramiko_vendor.", DeprecationWarning) from dulwich.contrib.paramiko_vendor import ParamikoSSHVendor return ParamikoSSHVendor(**kwargs) # Can be overridden by users get_ssh_vendor = SubprocessSSHVendor class SSHGitClient(TraditionalGitClient): def __init__(self, host, port=None, username=None, vendor=None, config=None, password=None, key_filename=None, **kwargs): self.host = host self.port = port self.username = username self.password = password self.key_filename = key_filename super(SSHGitClient, self).__init__(**kwargs) self.alternative_paths = {} if vendor is not None: self.ssh_vendor = vendor else: self.ssh_vendor = get_ssh_vendor() def get_url(self, path): netloc = self.host if self.port is not None: netloc += ":%d" % self.port if self.username is not None: netloc = urlquote(self.username, '@/:') + "@" + netloc return urlunsplit(('ssh', netloc, path, '', '')) @classmethod def from_parsedurl(cls, parsedurl, **kwargs): return cls(host=parsedurl.hostname, port=parsedurl.port, username=parsedurl.username, **kwargs) def _get_cmd_path(self, cmd): cmd = self.alternative_paths.get(cmd, b'git-' + cmd) assert isinstance(cmd, bytes) return cmd def _connect(self, cmd, path): if not isinstance(cmd, bytes): raise TypeError(cmd) if isinstance(path, bytes): path = path.decode(self._remote_path_encoding) if path.startswith("/~"): path = path[1:] argv = (self._get_cmd_path(cmd).decode(self._remote_path_encoding) + " '" + path + "'") kwargs = {} if self.password is not None: kwargs['password'] = self.password if self.key_filename is not None: kwargs['key_filename'] = self.key_filename con = self.ssh_vendor.run_command( self.host, argv, port=self.port, username=self.username, **kwargs) return (Protocol(con.read, con.write, con.close, report_activity=self._report_activity), con.can_read, getattr(con, 'stderr', None)) def default_user_agent_string(): # Start user agent with "git/", because GitHub requires this. :-( See # https://github.com/jelmer/dulwich/issues/562 for details. return "git/dulwich/%s" % ".".join([str(x) for x in dulwich.__version__]) def default_urllib3_manager(config, pool_manager_cls=None, proxy_manager_cls=None, **override_kwargs): """Return `urllib3` connection pool manager. Honour detected proxy configurations. Args: config: dulwich.config.ConfigDict` instance with Git configuration. kwargs: Additional arguments for urllib3.ProxyManager Returns: `pool_manager_cls` (defaults to `urllib3.ProxyManager`) instance for proxy configurations, `proxy_manager_cls` (defaults to `urllib3.PoolManager`) instance otherwise. """ proxy_server = user_agent = None ca_certs = ssl_verify = None if config is not None: try: proxy_server = config.get(b"http", b"proxy") except KeyError: pass try: user_agent = config.get(b"http", b"useragent") except KeyError: pass # TODO(jelmer): Support per-host settings try: ssl_verify = config.get_boolean(b"http", b"sslVerify") except KeyError: ssl_verify = True try: ca_certs = config.get(b"http", b"sslCAInfo") except KeyError: ca_certs = None if user_agent is None: user_agent = default_user_agent_string() headers = {"User-agent": user_agent} kwargs = {} if ssl_verify is True: kwargs['cert_reqs'] = "CERT_REQUIRED" elif ssl_verify is False: kwargs['cert_reqs'] = 'CERT_NONE' else: # Default to SSL verification kwargs['cert_reqs'] = "CERT_REQUIRED" if ca_certs is not None: kwargs['ca_certs'] = ca_certs kwargs.update(override_kwargs) # Try really hard to find a SSL certificate path if 'ca_certs' not in kwargs and kwargs.get('cert_reqs') != 'CERT_NONE': try: import certifi except ImportError: pass else: kwargs['ca_certs'] = certifi.where() import urllib3 if proxy_server is not None: if proxy_manager_cls is None: proxy_manager_cls = urllib3.ProxyManager # `urllib3` requires a `str` object in both Python 2 and 3, while # `ConfigDict` coerces entries to `bytes` on Python 3. Compensate. if not isinstance(proxy_server, str): proxy_server = proxy_server.decode() manager = proxy_manager_cls(proxy_server, headers=headers, **kwargs) else: if pool_manager_cls is None: pool_manager_cls = urllib3.PoolManager manager = pool_manager_cls(headers=headers, **kwargs) return manager class HttpGitClient(GitClient): def __init__(self, base_url, dumb=None, pool_manager=None, config=None, username=None, password=None, **kwargs): self._base_url = base_url.rstrip("/") + "/" self._username = username self._password = password self.dumb = dumb if pool_manager is None: self.pool_manager = default_urllib3_manager(config) else: self.pool_manager = pool_manager if username is not None: # No escaping needed: ":" is not allowed in username: # https://tools.ietf.org/html/rfc2617#section-2 credentials = "%s:%s" % (username, password) import urllib3.util basic_auth = urllib3.util.make_headers(basic_auth=credentials) self.pool_manager.headers.update(basic_auth) GitClient.__init__(self, **kwargs) def get_url(self, path): return self._get_url(path).rstrip("/") @classmethod def from_parsedurl(cls, parsedurl, **kwargs): password = parsedurl.password if password is not None: kwargs['password'] = urlunquote(password) username = parsedurl.username if username is not None: kwargs['username'] = urlunquote(username) netloc = parsedurl.hostname if parsedurl.port: netloc = "%s:%s" % (netloc, parsedurl.port) if parsedurl.username: netloc = "%s@%s" % (parsedurl.username, netloc) parsedurl = parsedurl._replace(netloc=netloc) return cls(urlunparse(parsedurl), **kwargs) def __repr__(self): return "%s(%r, dumb=%r)" % ( type(self).__name__, self._base_url, self.dumb) def _get_url(self, path): if not isinstance(path, str): # urllib3.util.url._encode_invalid_chars() converts the path back # to bytes using the utf-8 codec. path = path.decode('utf-8') return urljoin(self._base_url, path).rstrip("/") + "/" def _http_request(self, url, headers=None, data=None, allow_compression=False): """Perform HTTP request. Args: url: Request URL. headers: Optional custom headers to override defaults. data: Request data. allow_compression: Allow GZipped communication. Returns: Tuple (`response`, `read`), where response is an `urllib3` response object with additional `content_type` and `redirect_location` properties, and `read` is a consumable read method for the response data. """ req_headers = self.pool_manager.headers.copy() if headers is not None: req_headers.update(headers) req_headers["Pragma"] = "no-cache" if allow_compression: req_headers["Accept-Encoding"] = "gzip" else: req_headers["Accept-Encoding"] = "identity" if data is None: resp = self.pool_manager.request("GET", url, headers=req_headers) else: resp = self.pool_manager.request("POST", url, headers=req_headers, body=data) if resp.status == 404: raise NotGitRepository() elif resp.status != 200: raise GitProtocolError("unexpected http resp %d for %s" % (resp.status, url)) # TODO: Optimization available by adding `preload_content=False` to the # request and just passing the `read` method on instead of going via # `BytesIO`, if we can guarantee that the entire response is consumed # before issuing the next to still allow for connection reuse from the # pool. read = BytesIO(resp.data).read resp.content_type = resp.getheader("Content-Type") # Check if geturl() is available (urllib3 version >= 1.23) try: resp_url = resp.geturl() except AttributeError: # get_redirect_location() is available for urllib3 >= 1.1 resp.redirect_location = resp.get_redirect_location() else: resp.redirect_location = resp_url if resp_url != url else '' return resp, read def _discover_references(self, service, base_url): assert base_url[-1] == "/" tail = "info/refs" headers = {"Accept": "*/*"} if self.dumb is not True: tail += "?service=%s" % service.decode('ascii') url = urljoin(base_url, tail) resp, read = self._http_request(url, headers, allow_compression=True) if resp.redirect_location: # Something changed (redirect!), so let's update the base URL if not resp.redirect_location.endswith(tail): raise GitProtocolError( "Redirected from URL %s to URL %s without %s" % ( url, resp.redirect_location, tail)) base_url = resp.redirect_location[:-len(tail)] try: self.dumb = not resp.content_type.startswith("application/x-git-") if not self.dumb: proto = Protocol(read, None) # The first line should mention the service try: [pkt] = list(proto.read_pkt_seq()) except ValueError: raise GitProtocolError( "unexpected number of packets received") if pkt.rstrip(b'\n') != (b'# service=' + service): raise GitProtocolError( "unexpected first line %r from smart server" % pkt) return read_pkt_refs(proto) + (base_url, ) else: return read_info_refs(resp), set(), base_url finally: resp.close() def _smart_request(self, service, url, data): assert url[-1] == "/" url = urljoin(url, service) result_content_type = "application/x-%s-result" % service headers = { "Content-Type": "application/x-%s-request" % service, "Accept": result_content_type, "Content-Length": str(len(data)), } resp, read = self._http_request(url, headers, data) if resp.content_type != result_content_type: raise GitProtocolError("Invalid content-type from server: %s" % resp.content_type) return resp, read def send_pack(self, path, update_refs, generate_pack_data, progress=None): """Upload a pack to a remote repository. Args: path: Repository path (as bytestring) update_refs: Function to determine changes to remote refs. Receives dict with existing remote refs, returns dict with changed refs (name -> sha, where sha=ZERO_SHA for deletions) generate_pack_data: Function that can return a tuple with number of elements and pack data to upload. progress: Optional progress function Returns: new_refs dictionary containing the changes that were made {refname: new_ref}, including deleted refs. Raises: SendPackError: if server rejects the pack data UpdateRefsError: if the server supports report-status and rejects ref updates """ url = self._get_url(path) old_refs, server_capabilities, url = self._discover_references( b"git-receive-pack", url) negotiated_capabilities = self._negotiate_receive_pack_capabilities( server_capabilities) negotiated_capabilities.add(capability_agent()) if CAPABILITY_REPORT_STATUS in negotiated_capabilities: self._report_status_parser = ReportStatusParser() new_refs = update_refs(dict(old_refs)) if new_refs is None: # Determine wants function is aborting the push. return old_refs if self.dumb: raise NotImplementedError(self.fetch_pack) req_data = BytesIO() req_proto = Protocol(None, req_data.write) (have, want) = self._handle_receive_pack_head( req_proto, negotiated_capabilities, old_refs, new_refs) if not want and set(new_refs.items()).issubset(set(old_refs.items())): return new_refs pack_data_count, pack_data = generate_pack_data( have, want, ofs_delta=(CAPABILITY_OFS_DELTA in negotiated_capabilities)) if pack_data_count: write_pack_data(req_proto.write_file(), pack_data_count, pack_data) resp, read = self._smart_request("git-receive-pack", url, data=req_data.getvalue()) try: resp_proto = Protocol(read, None) self._handle_receive_pack_tail( resp_proto, negotiated_capabilities, progress) return new_refs finally: resp.close() def fetch_pack(self, path, determine_wants, graph_walker, pack_data, progress=None, depth=None): """Retrieve a pack from a git smart server. Args: path: Path to fetch from determine_wants: Callback that returns list of commits to fetch graph_walker: Object with next() and ack(). pack_data: Callback called for each bit of data in the pack progress: Callback for progress reports (strings) depth: Depth for request Returns: FetchPackResult object """ url = self._get_url(path) refs, server_capabilities, url = self._discover_references( b"git-upload-pack", url) negotiated_capabilities, symrefs, agent = ( self._negotiate_upload_pack_capabilities( server_capabilities)) wants = determine_wants(refs) if wants is not None: wants = [cid for cid in wants if cid != ZERO_SHA] if not wants: return FetchPackResult(refs, symrefs, agent) if self.dumb: raise NotImplementedError(self.send_pack) req_data = BytesIO() req_proto = Protocol(None, req_data.write) (new_shallow, new_unshallow) = self._handle_upload_pack_head( req_proto, negotiated_capabilities, graph_walker, wants, can_read=None, depth=depth) resp, read = self._smart_request( "git-upload-pack", url, data=req_data.getvalue()) try: resp_proto = Protocol(read, None) if new_shallow is None and new_unshallow is None: (new_shallow, new_unshallow) = _read_shallow_updates( resp_proto) self._handle_upload_pack_tail( resp_proto, negotiated_capabilities, graph_walker, pack_data, progress) return FetchPackResult( refs, symrefs, agent, new_shallow, new_unshallow) finally: resp.close() def get_refs(self, path): """Retrieve the current refs from a git smart server. """ url = self._get_url(path) refs, _, _ = self._discover_references( b"git-upload-pack", url) return refs def get_transport_and_path_from_url(url, config=None, **kwargs): """Obtain a git client from a URL. Args: url: URL to open (a unicode string) config: Optional config object thin_packs: Whether or not thin packs should be retrieved report_activity: Optional callback for reporting transport activity. Returns: Tuple with client instance and relative path. """ parsed = urlparse(url) if parsed.scheme == 'git': return (TCPGitClient.from_parsedurl(parsed, **kwargs), parsed.path) elif parsed.scheme in ('git+ssh', 'ssh'): return SSHGitClient.from_parsedurl(parsed, **kwargs), parsed.path elif parsed.scheme in ('http', 'https'): return HttpGitClient.from_parsedurl( parsed, config=config, **kwargs), parsed.path elif parsed.scheme == 'file': return default_local_git_client_cls.from_parsedurl( parsed, **kwargs), parsed.path raise ValueError("unknown scheme '%s'" % parsed.scheme) def parse_rsync_url(location): """Parse a rsync-style URL. """ if ':' in location and '@' not in location: # SSH with no user@, zero or one leading slash. (host, path) = location.split(':', 1) user = None elif ':' in location: # SSH with user@host:foo. user_host, path = location.split(':', 1) if '@' in user_host: user, host = user_host.rsplit('@', 1) else: user = None host = user_host else: raise ValueError('not a valid rsync-style URL') return (user, host, path) def get_transport_and_path(location, **kwargs): """Obtain a git client from a URL. Args: location: URL or path (a string) config: Optional config object thin_packs: Whether or not thin packs should be retrieved report_activity: Optional callback for reporting transport activity. Returns: Tuple with client instance and relative path. """ # First, try to parse it as a URL try: return get_transport_and_path_from_url(location, **kwargs) except ValueError: pass if (sys.platform == 'win32' and location[0].isalpha() and location[1:3] == ':\\'): # Windows local path return default_local_git_client_cls(**kwargs), location try: (username, hostname, path) = parse_rsync_url(location) except ValueError: # Otherwise, assume it's a local path. return default_local_git_client_cls(**kwargs), location else: return SSHGitClient(hostname, username=username, **kwargs), path DEFAULT_GIT_CREDENTIALS_PATHS = [ os.path.expanduser('~/.git-credentials'), get_xdg_config_home_path('git', 'credentials')] def get_credentials_from_store(scheme, hostname, username=None, fnames=DEFAULT_GIT_CREDENTIALS_PATHS): for fname in fnames: try: with open(fname, 'rb') as f: for line in f: parsed_line = urlparse.urlparse(line) if (parsed_line.scheme == scheme and parsed_line.hostname == hostname and (username is None or parsed_line.username == username)): return parsed_line.username, parsed_line.password except OSError as e: if e.errno != errno.ENOENT: raise # If the file doesn't exist, try the next one. continue diff --git a/dulwich/objects.py b/dulwich/objects.py index e081eac7..a0300dc6 100644 --- a/dulwich/objects.py +++ b/dulwich/objects.py @@ -1,1443 +1,1443 @@ # objects.py -- Access to base git objects # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Access to base git objects.""" import binascii from io import BytesIO from collections import namedtuple import os import posixpath import stat from typing import ( Optional, Dict, Union, Type, ) import warnings import zlib from hashlib import sha1 from dulwich.errors import ( ChecksumMismatch, NotBlobError, NotCommitError, NotTagError, NotTreeError, ObjectFormatException, FileFormatException, ) from dulwich.file import GitFile ZERO_SHA = b'0' * 40 # Header fields for commits _TREE_HEADER = b'tree' _PARENT_HEADER = b'parent' _AUTHOR_HEADER = b'author' _COMMITTER_HEADER = b'committer' _ENCODING_HEADER = b'encoding' _MERGETAG_HEADER = b'mergetag' _GPGSIG_HEADER = b'gpgsig' # Header fields for objects _OBJECT_HEADER = b'object' _TYPE_HEADER = b'type' _TAG_HEADER = b'tag' _TAGGER_HEADER = b'tagger' S_IFGITLINK = 0o160000 MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----" class EmptyFileException(FileFormatException): """An unexpectedly empty file was encountered.""" def S_ISGITLINK(m): """Check if a mode indicates a submodule. Args: m: Mode to check Returns: a ``boolean`` """ return (stat.S_IFMT(m) == S_IFGITLINK) def _decompress(string): dcomp = zlib.decompressobj() dcomped = dcomp.decompress(string) dcomped += dcomp.flush() return dcomped def sha_to_hex(sha): """Takes a string and returns the hex of the sha within""" hexsha = binascii.hexlify(sha) assert len(hexsha) == 40, "Incorrect length of sha1 string: %d" % hexsha return hexsha def hex_to_sha(hex): """Takes a hex sha and returns a binary sha""" assert len(hex) == 40, "Incorrect length of hexsha: %s" % hex try: return binascii.unhexlify(hex) except TypeError as exc: if not isinstance(hex, bytes): raise raise ValueError(exc.args[0]) def valid_hexsha(hex): if len(hex) != 40: return False try: binascii.unhexlify(hex) except (TypeError, binascii.Error): return False else: return True def hex_to_filename(path, hex): """Takes a hex sha and returns its filename relative to the given path.""" # os.path.join accepts bytes or unicode, but all args must be of the same # type. Make sure that hex which is expected to be bytes, is the same type # as path. if getattr(path, 'encode', None) is not None: hex = hex.decode('ascii') dir = hex[:2] file = hex[2:] # Check from object dir return os.path.join(path, dir, file) def filename_to_hex(filename): """Takes an object filename and returns its corresponding hex sha.""" # grab the last (up to) two path components names = filename.rsplit(os.path.sep, 2)[-2:] errmsg = "Invalid object filename: %s" % filename assert len(names) == 2, errmsg base, rest = names assert len(base) == 2 and len(rest) == 38, errmsg hex = (base + rest).encode('ascii') hex_to_sha(hex) return hex def object_header(num_type: int, length: int) -> bytes: """Return an object header for the given numeric type and text length.""" return (object_class(num_type).type_name + b' ' + str(length).encode('ascii') + b'\0') -def serializable_property(name: str, docstring:Optional[str]=None): +def serializable_property(name: str, docstring: Optional[str] = None): """A property that helps tracking whether serialization is necessary. """ def set(obj, value): setattr(obj, "_"+name, value) obj._needs_serialization = True def get(obj): return getattr(obj, "_"+name) return property(get, set, doc=docstring) def object_class(type): """Get the object class corresponding to the given type. Args: type: Either a type name string or a numeric type. Returns: The ShaFile subclass corresponding to the given type, or None if type is not a valid type name/number. """ return _TYPE_MAP.get(type, None) def check_hexsha(hex, error_msg): """Check if a string is a valid hex sha string. Args: hex: Hex string to check error_msg: Error message to use in exception Raises: ObjectFormatException: Raised when the string is not valid """ if not valid_hexsha(hex): raise ObjectFormatException("%s %s" % (error_msg, hex)) def check_identity(identity, error_msg): """Check if the specified identity is valid. This will raise an exception if the identity is not valid. Args: identity: Identity string error_msg: Error message to use in exception """ email_start = identity.find(b'<') email_end = identity.find(b'>') if (email_start < 0 or email_end < 0 or email_end <= email_start or identity.find(b'<', email_start + 1) >= 0 or identity.find(b'>', email_end + 1) >= 0 or not identity.endswith(b'>')): raise ObjectFormatException(error_msg) def check_time(time_seconds): """Check if the specified time is not prone to overflow error. This will raise an exception if the time is not valid. Args: time_info: author/committer/tagger info """ # Prevent overflow error if time_seconds > MAX_TIME: raise ObjectFormatException( 'Date field should not exceed %s' % MAX_TIME) def git_line(*items): """Formats items into a space separated line.""" return b' '.join(items) + b'\n' class FixedSha(object): """SHA object that behaves like hashlib's but is given a fixed value.""" __slots__ = ('_hexsha', '_sha') def __init__(self, hexsha): if getattr(hexsha, 'encode', None) is not None: hexsha = hexsha.encode('ascii') if not isinstance(hexsha, bytes): raise TypeError('Expected bytes for hexsha, got %r' % hexsha) self._hexsha = hexsha self._sha = hex_to_sha(hexsha) def digest(self): """Return the raw SHA digest.""" return self._sha def hexdigest(self): """Return the hex SHA digest.""" return self._hexsha.decode('ascii') class ShaFile(object): """A git SHA file.""" __slots__ = ('_chunked_text', '_sha', '_needs_serialization') - type_name:bytes - type_num:int + type_name: bytes + type_num: int @staticmethod def _parse_legacy_object_header(magic, f): """Parse a legacy object, creating it but not reading the file.""" bufsize = 1024 decomp = zlib.decompressobj() header = decomp.decompress(magic) start = 0 end = -1 while end < 0: extra = f.read(bufsize) header += decomp.decompress(extra) magic += extra end = header.find(b'\0', start) start = len(header) header = header[:end] type_name, size = header.split(b' ', 1) try: int(size) # sanity check except ValueError as e: raise ObjectFormatException("Object size not an integer: %s" % e) obj_class = object_class(type_name) if not obj_class: raise ObjectFormatException("Not a known type: %s" % type_name) return obj_class() def _parse_legacy_object(self, map): """Parse a legacy object, setting the raw string.""" text = _decompress(map) header_end = text.find(b'\0') if header_end < 0: raise ObjectFormatException("Invalid object header, no \\0") self.set_raw_string(text[header_end+1:]) def as_legacy_object_chunks(self, compression_level=-1): """Return chunks representing the object in the experimental format. Returns: List of strings """ compobj = zlib.compressobj(compression_level) yield compobj.compress(self._header()) for chunk in self.as_raw_chunks(): yield compobj.compress(chunk) yield compobj.flush() def as_legacy_object(self, compression_level=-1): """Return string representing the object in the experimental format. """ return b''.join(self.as_legacy_object_chunks( compression_level=compression_level)) def as_raw_chunks(self): """Return chunks with serialization of the object. Returns: List of strings, not necessarily one per line """ if self._needs_serialization: self._sha = None self._chunked_text = self._serialize() self._needs_serialization = False return self._chunked_text def as_raw_string(self): """Return raw string with serialization of the object. Returns: String object """ return b''.join(self.as_raw_chunks()) def __bytes__(self): """Return raw string serialization of this object.""" return self.as_raw_string() def __hash__(self): """Return unique hash for this object.""" return hash(self.id) def as_pretty_string(self): """Return a string representing this object, fit for display.""" return self.as_raw_string() def set_raw_string(self, text, sha=None): """Set the contents of this object from a serialized string.""" if not isinstance(text, bytes): raise TypeError('Expected bytes for text, got %r' % text) self.set_raw_chunks([text], sha) def set_raw_chunks(self, chunks, sha=None): """Set the contents of this object from a list of chunks.""" self._chunked_text = chunks self._deserialize(chunks) if sha is None: self._sha = None else: self._sha = FixedSha(sha) self._needs_serialization = False @staticmethod def _parse_object_header(magic, f): """Parse a new style object, creating it but not reading the file.""" num_type = (ord(magic[0:1]) >> 4) & 7 obj_class = object_class(num_type) if not obj_class: raise ObjectFormatException("Not a known type %d" % num_type) return obj_class() def _parse_object(self, map): """Parse a new style object, setting self._text.""" # skip type and size; type must have already been determined, and # we trust zlib to fail if it's otherwise corrupted byte = ord(map[0:1]) used = 1 while (byte & 0x80) != 0: byte = ord(map[used:used+1]) used += 1 raw = map[used:] self.set_raw_string(_decompress(raw)) @classmethod def _is_legacy_object(cls, magic): b0 = ord(magic[0:1]) b1 = ord(magic[1:2]) word = (b0 << 8) + b1 return (b0 & 0x8F) == 0x08 and (word % 31) == 0 @classmethod def _parse_file(cls, f): map = f.read() if not map: raise EmptyFileException('Corrupted empty file detected') if cls._is_legacy_object(map): obj = cls._parse_legacy_object_header(map, f) obj._parse_legacy_object(map) else: obj = cls._parse_object_header(map, f) obj._parse_object(map) return obj def __init__(self): """Don't call this directly""" self._sha = None self._chunked_text = [] self._needs_serialization = True def _deserialize(self, chunks): raise NotImplementedError(self._deserialize) def _serialize(self): raise NotImplementedError(self._serialize) @classmethod def from_path(cls, path): """Open a SHA file from disk.""" with GitFile(path, 'rb') as f: return cls.from_file(f) @classmethod def from_file(cls, f): """Get the contents of a SHA file on disk.""" try: obj = cls._parse_file(f) obj._sha = None return obj except (IndexError, ValueError): raise ObjectFormatException("invalid object header") @staticmethod def from_raw_string(type_num, string, sha=None): """Creates an object of the indicated type from the raw string given. Args: type_num: The numeric type of the object. string: The raw uncompressed contents. sha: Optional known sha for the object """ obj = object_class(type_num)() obj.set_raw_string(string, sha) return obj @staticmethod def from_raw_chunks(type_num, chunks, sha=None): """Creates an object of the indicated type from the raw chunks given. Args: type_num: The numeric type of the object. chunks: An iterable of the raw uncompressed contents. sha: Optional known sha for the object """ obj = object_class(type_num)() obj.set_raw_chunks(chunks, sha) return obj @classmethod def from_string(cls, string): """Create a ShaFile from a string.""" obj = cls() obj.set_raw_string(string) return obj def _check_has_member(self, member, error_msg): """Check that the object has a given member variable. Args: member: the member variable to check for error_msg: the message for an error if the member is missing Raises: ObjectFormatException: with the given error_msg if member is missing or is None """ if getattr(self, member, None) is None: raise ObjectFormatException(error_msg) def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way ChecksumMismatch: if the object was created with a SHA that does not match its contents """ # TODO: if we find that error-checking during object parsing is a # performance bottleneck, those checks should be moved to the class's # check() method during optimization so we can still check the object # when necessary. old_sha = self.id try: self._deserialize(self.as_raw_chunks()) self._sha = None new_sha = self.id except Exception as e: raise ObjectFormatException(e) if old_sha != new_sha: raise ChecksumMismatch(new_sha, old_sha) def _header(self): return object_header(self.type, self.raw_length()) def raw_length(self): """Returns the length of the raw string of this object.""" ret = 0 for chunk in self.as_raw_chunks(): ret += len(chunk) return ret def sha(self): """The SHA1 object that is the name of this object.""" if self._sha is None or self._needs_serialization: # this is a local because as_raw_chunks() overwrites self._sha new_sha = sha1() new_sha.update(self._header()) for chunk in self.as_raw_chunks(): new_sha.update(chunk) self._sha = new_sha return self._sha def copy(self): """Create a new copy of this SHA1 object from its raw string""" obj_class = object_class(self.get_type()) return obj_class.from_raw_string( self.get_type(), self.as_raw_string(), self.id) @property def id(self): """The hex SHA of this object.""" return self.sha().hexdigest().encode('ascii') def get_type(self): """Return the type number for this object class.""" return self.type_num def set_type(self, type): """Set the type number for this object class.""" self.type_num = type # DEPRECATED: use type_num or type_name as needed. type = property(get_type, set_type) def __repr__(self): return "<%s %s>" % (self.__class__.__name__, self.id) def __ne__(self, other): """Check whether this object does not match the other.""" return not isinstance(other, ShaFile) or self.id != other.id def __eq__(self, other): """Return True if the SHAs of the two objects match. """ return isinstance(other, ShaFile) and self.id == other.id def __lt__(self, other): """Return whether SHA of this object is less than the other. """ if not isinstance(other, ShaFile): raise TypeError return self.id < other.id def __le__(self, other): """Check whether SHA of this object is less than or equal to the other. """ if not isinstance(other, ShaFile): raise TypeError return self.id <= other.id def __cmp__(self, other): """Compare the SHA of this object with that of the other object. """ if not isinstance(other, ShaFile): raise TypeError return cmp(self.id, other.id) # noqa: F821 class Blob(ShaFile): """A Git Blob object.""" __slots__ = () type_name = b'blob' type_num = 3 def __init__(self): super(Blob, self).__init__() self._chunked_text = [] self._needs_serialization = False def _get_data(self): return self.as_raw_string() def _set_data(self, data): self.set_raw_string(data) data = property(_get_data, _set_data, doc="The text contained within the blob object.") def _get_chunked(self): return self._chunked_text def _set_chunked(self, chunks): self._chunked_text = chunks def _serialize(self): return self._chunked_text def _deserialize(self, chunks): self._chunked_text = chunks chunked = property( _get_chunked, _set_chunked, doc="The text in the blob object, as chunks (not necessarily lines)") @classmethod def from_path(cls, path): blob = ShaFile.from_path(path) if not isinstance(blob, cls): raise NotBlobError(path) return blob def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Blob, self).check() def splitlines(self): """Return list of lines in this blob. This preserves the original line endings. """ chunks = self.chunked if not chunks: return [] if len(chunks) == 1: return chunks[0].splitlines(True) remaining = None ret = [] for chunk in chunks: lines = chunk.splitlines(True) if len(lines) > 1: ret.append((remaining or b"") + lines[0]) ret.extend(lines[1:-1]) remaining = lines[-1] elif len(lines) == 1: if remaining is None: remaining = lines.pop() else: remaining += lines.pop() if remaining is not None: ret.append(remaining) return ret def _parse_message(chunks): """Parse a message with a list of fields and a body. Args: chunks: the raw chunks of the tag or commit object. Returns: iterator of tuples of (field, value), one per header line, in the order read from the text, possibly including duplicates. Includes a field named None for the freeform tag/commit text. """ f = BytesIO(b''.join(chunks)) k = None v = "" eof = False def _strip_last_newline(value): """Strip the last newline from value""" if value and value.endswith(b'\n'): return value[:-1] return value # Parse the headers # # Headers can contain newlines. The next line is indented with a space. # We store the latest key as 'k', and the accumulated value as 'v'. for line in f: if line.startswith(b' '): # Indented continuation of the previous line v += line[1:] else: if k is not None: # We parsed a new header, return its value yield (k, _strip_last_newline(v)) if line == b'\n': # Empty line indicates end of headers break (k, v) = line.split(b' ', 1) else: # We reached end of file before the headers ended. We still need to # return the previous header, then we need to return a None field for # the text. eof = True if k is not None: yield (k, _strip_last_newline(v)) yield (None, None) if not eof: # We didn't reach the end of file while parsing headers. We can return # the rest of the file as a message. yield (None, f.read()) f.close() class Tag(ShaFile): """A Git Tag object.""" type_name = b'tag' type_num = 4 __slots__ = ('_tag_timezone_neg_utc', '_name', '_object_sha', '_object_class', '_tag_time', '_tag_timezone', '_tagger', '_message', '_signature') def __init__(self): super(Tag, self).__init__() self._tagger = None self._tag_time = None self._tag_timezone = None self._tag_timezone_neg_utc = False self._signature = None @classmethod def from_path(cls, filename): tag = ShaFile.from_path(filename) if not isinstance(tag, cls): raise NotTagError(filename) return tag def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Tag, self).check() self._check_has_member("_object_sha", "missing object sha") self._check_has_member("_object_class", "missing object type") self._check_has_member("_name", "missing tag name") if not self._name: raise ObjectFormatException("empty tag name") check_hexsha(self._object_sha, "invalid object sha") if getattr(self, "_tagger", None): check_identity(self._tagger, "invalid tagger") self._check_has_member("_tag_time", "missing tag time") check_time(self._tag_time) last = None for field, _ in _parse_message(self._chunked_text): if field == _OBJECT_HEADER and last is not None: raise ObjectFormatException("unexpected object") elif field == _TYPE_HEADER and last != _OBJECT_HEADER: raise ObjectFormatException("unexpected type") elif field == _TAG_HEADER and last != _TYPE_HEADER: raise ObjectFormatException("unexpected tag name") elif field == _TAGGER_HEADER and last != _TAG_HEADER: raise ObjectFormatException("unexpected tagger") last = field def _serialize(self): chunks = [] chunks.append(git_line(_OBJECT_HEADER, self._object_sha)) chunks.append(git_line(_TYPE_HEADER, self._object_class.type_name)) chunks.append(git_line(_TAG_HEADER, self._name)) if self._tagger: if self._tag_time is None: chunks.append(git_line(_TAGGER_HEADER, self._tagger)) else: chunks.append(git_line( _TAGGER_HEADER, self._tagger, str(self._tag_time).encode('ascii'), format_timezone( self._tag_timezone, self._tag_timezone_neg_utc))) if self._message is not None: chunks.append(b'\n') # To close headers chunks.append(self._message) if self._signature is not None: chunks.append(self._signature) return chunks def _deserialize(self, chunks): """Grab the metadata attached to the tag""" self._tagger = None self._tag_time = None self._tag_timezone = None self._tag_timezone_neg_utc = False for field, value in _parse_message(chunks): if field == _OBJECT_HEADER: self._object_sha = value elif field == _TYPE_HEADER: obj_class = object_class(value) if not obj_class: raise ObjectFormatException("Not a known type: %s" % value) self._object_class = obj_class elif field == _TAG_HEADER: self._name = value elif field == _TAGGER_HEADER: (self._tagger, self._tag_time, (self._tag_timezone, self._tag_timezone_neg_utc)) = parse_time_entry(value) elif field is None: if value is None: self._message = None self._signature = None else: try: sig_idx = value.index(BEGIN_PGP_SIGNATURE) except ValueError: self._message = value self._signature = None else: self._message = value[:sig_idx] self._signature = value[sig_idx:] else: raise ObjectFormatException("Unknown field %s" % field) def _get_object(self): """Get the object pointed to by this tag. Returns: tuple of (object class, sha). """ return (self._object_class, self._object_sha) def _set_object(self, value): (self._object_class, self._object_sha) = value self._needs_serialization = True object = property(_get_object, _set_object) name = serializable_property("name", "The name of this tag") tagger = serializable_property( "tagger", "Returns the name of the person who created this tag") tag_time = serializable_property( "tag_time", "The creation timestamp of the tag. As the number of seconds " "since the epoch") tag_timezone = serializable_property( "tag_timezone", "The timezone that tag_time is in.") message = serializable_property( "message", "the message attached to this tag") signature = serializable_property( "signature", "Optional detached GPG signature") class TreeEntry(namedtuple('TreeEntry', ['path', 'mode', 'sha'])): """Named tuple encapsulating a single tree entry.""" def in_path(self, path): """Return a copy of this entry with the given path prepended.""" if not isinstance(self.path, bytes): raise TypeError('Expected bytes for path, got %r' % path) return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha) def parse_tree(text, strict=False): """Parse a tree text. Args: text: Serialized text to parse Returns: iterator of tuples of (name, mode, sha) Raises: ObjectFormatException: if the object was malformed in some way """ count = 0 length = len(text) while count < length: mode_end = text.index(b' ', count) mode_text = text[count:mode_end] if strict and mode_text.startswith(b'0'): raise ObjectFormatException("Invalid mode '%s'" % mode_text) try: mode = int(mode_text, 8) except ValueError: raise ObjectFormatException("Invalid mode '%s'" % mode_text) name_end = text.index(b'\0', mode_end) name = text[mode_end+1:name_end] count = name_end+21 sha = text[name_end+1:count] if len(sha) != 20: raise ObjectFormatException("Sha has invalid length") hexsha = sha_to_hex(sha) yield (name, mode, hexsha) def serialize_tree(items): """Serialize the items in a tree to a text. Args: items: Sorted iterable over (name, mode, sha) tuples Returns: Serialized tree text as chunks """ for name, mode, hexsha in items: yield (("%04o" % mode).encode('ascii') + b' ' + name + b'\0' + hex_to_sha(hexsha)) def sorted_tree_items(entries, name_order): """Iterate over a tree entries dictionary. Args: name_order: If True, iterate entries in order of their name. If False, iterate entries in tree order, that is, treat subtree entries as having '/' appended. entries: Dictionary mapping names to (mode, sha) tuples Returns: Iterator over (name, mode, hexsha) """ key_func = name_order and key_entry_name_order or key_entry for name, entry in sorted(entries.items(), key=key_func): mode, hexsha = entry # Stricter type checks than normal to mirror checks in the C version. mode = int(mode) if not isinstance(hexsha, bytes): raise TypeError('Expected bytes for SHA, got %r' % hexsha) yield TreeEntry(name, mode, hexsha) def key_entry(entry): """Sort key for tree entry. Args: entry: (name, value) tuplee """ (name, value) = entry if stat.S_ISDIR(value[0]): name += b'/' return name def key_entry_name_order(entry): """Sort key for tree entry in name order.""" return entry[0] def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8"): """Pretty format tree entry. Args: name: Name of the directory entry mode: Mode of entry hexsha: Hexsha of the referenced object Returns: string describing the tree entry """ if mode & stat.S_IFDIR: kind = "tree" else: kind = "blob" return "%04o %s %s\t%s\n" % ( mode, kind, hexsha.decode('ascii'), name.decode(encoding, 'replace')) class Tree(ShaFile): """A Git tree object""" type_name = b'tree' type_num = 2 __slots__ = ('_entries') def __init__(self): super(Tree, self).__init__() self._entries = {} @classmethod def from_path(cls, filename): tree = ShaFile.from_path(filename) if not isinstance(tree, cls): raise NotTreeError(filename) return tree def __contains__(self, name): return name in self._entries def __getitem__(self, name): return self._entries[name] def __setitem__(self, name, value): """Set a tree entry by name. Args: name: The name of the entry, as a string. value: A tuple of (mode, hexsha), where mode is the mode of the entry as an integral type and hexsha is the hex SHA of the entry as a string. """ mode, hexsha = value self._entries[name] = (mode, hexsha) self._needs_serialization = True def __delitem__(self, name): del self._entries[name] self._needs_serialization = True def __len__(self): return len(self._entries) def __iter__(self): return iter(self._entries) def add(self, name, mode, hexsha): """Add an entry to the tree. Args: mode: The mode of the entry as an integral type. Not all possible modes are supported by git; see check() for details. name: The name of the entry, as a string. hexsha: The hex SHA of the entry as a string. """ if isinstance(name, int) and isinstance(mode, bytes): (name, mode) = (mode, name) warnings.warn( "Please use Tree.add(name, mode, hexsha)", category=DeprecationWarning, stacklevel=2) self._entries[name] = mode, hexsha self._needs_serialization = True def iteritems(self, name_order=False): """Iterate over entries. Args: name_order: If True, iterate in name order instead of tree order. Returns: Iterator over (name, mode, sha) tuples """ return sorted_tree_items(self._entries, name_order) def items(self): """Return the sorted entries in this tree. Returns: List with (name, mode, sha) tuples """ return list(self.iteritems()) def _deserialize(self, chunks): """Grab the entries in the tree""" try: parsed_entries = parse_tree(b''.join(chunks)) except ValueError as e: raise ObjectFormatException(e) # TODO: list comprehension is for efficiency in the common (small) # case; if memory efficiency in the large case is a concern, use a # genexp. self._entries = dict([(n, (m, s)) for n, m, s in parsed_entries]) def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Tree, self).check() last = None allowed_modes = (stat.S_IFREG | 0o755, stat.S_IFREG | 0o644, stat.S_IFLNK, stat.S_IFDIR, S_IFGITLINK, # TODO: optionally exclude as in git fsck --strict stat.S_IFREG | 0o664) for name, mode, sha in parse_tree(b''.join(self._chunked_text), True): check_hexsha(sha, 'invalid sha %s' % sha) if b'/' in name or name in (b'', b'.', b'..', b'.git'): raise ObjectFormatException( 'invalid name %s' % name.decode('utf-8', 'replace')) if mode not in allowed_modes: raise ObjectFormatException('invalid mode %06o' % mode) entry = (name, (mode, sha)) if last: if key_entry(last) > key_entry(entry): raise ObjectFormatException('entries not sorted') if name == last[0]: raise ObjectFormatException('duplicate entry %s' % name) last = entry def _serialize(self): return list(serialize_tree(self.iteritems())) def as_pretty_string(self): text = [] for name, mode, hexsha in self.iteritems(): text.append(pretty_format_tree_entry(name, mode, hexsha)) return "".join(text) def lookup_path(self, lookup_obj, path): """Look up an object in a Git tree. Args: lookup_obj: Callback for retrieving object by SHA1 path: Path to lookup Returns: A tuple of (mode, SHA) of the resulting path. """ parts = path.split(b'/') sha = self.id mode = None for p in parts: if not p: continue obj = lookup_obj(sha) if not isinstance(obj, Tree): raise NotTreeError(sha) mode, sha = obj[p] return mode, sha def parse_timezone(text): """Parse a timezone text fragment (e.g. '+0100'). Args: text: Text to parse. Returns: Tuple with timezone as seconds difference to UTC and a boolean indicating whether this was a UTC timezone prefixed with a negative sign (-0000). """ # cgit parses the first character as the sign, and the rest # as an integer (using strtol), which could also be negative. # We do the same for compatibility. See #697828. if not text[0] in b'+-': raise ValueError("Timezone must start with + or - (%(text)s)" % vars()) sign = text[:1] offset = int(text[1:]) if sign == b'-': offset = -offset unnecessary_negative_timezone = (offset >= 0 and sign == b'-') signum = (offset < 0) and -1 or 1 offset = abs(offset) hours = int(offset / 100) minutes = (offset % 100) return (signum * (hours * 3600 + minutes * 60), unnecessary_negative_timezone) def format_timezone(offset, unnecessary_negative_timezone=False): """Format a timezone for Git serialization. Args: offset: Timezone offset as seconds difference to UTC unnecessary_negative_timezone: Whether to use a minus sign for UTC or positive timezones (-0000 and --700 rather than +0000 / +0700). """ if offset % 60 != 0: raise ValueError("Unable to handle non-minute offset.") if offset < 0 or unnecessary_negative_timezone: sign = '-' offset = -offset else: sign = '+' return ('%c%02d%02d' % (sign, offset / 3600, (offset / 60) % 60)).encode('ascii') def parse_time_entry(value): """Parse time entry behavior Args: value: Bytes representing a git commit/tag line Raises: ObjectFormatException in case of parsing error (malformed field date) Returns: Tuple of (author, time, (timezone, timezone_neg_utc)) """ try: sep = value.rindex(b'> ') except ValueError: return (value, None, (None, False)) try: person = value[0:sep+1] rest = value[sep+2:] timetext, timezonetext = rest.rsplit(b' ', 1) time = int(timetext) timezone, timezone_neg_utc = parse_timezone(timezonetext) except ValueError as e: raise ObjectFormatException(e) return person, time, (timezone, timezone_neg_utc) def parse_commit(chunks): """Parse a commit object from chunks. Args: chunks: Chunks to parse Returns: Tuple of (tree, parents, author_info, commit_info, encoding, mergetag, gpgsig, message, extra) """ parents = [] extra = [] tree = None author_info = (None, None, (None, None)) commit_info = (None, None, (None, None)) encoding = None mergetag = [] message = None gpgsig = None for field, value in _parse_message(chunks): # TODO(jelmer): Enforce ordering if field == _TREE_HEADER: tree = value elif field == _PARENT_HEADER: parents.append(value) elif field == _AUTHOR_HEADER: author_info = parse_time_entry(value) elif field == _COMMITTER_HEADER: commit_info = parse_time_entry(value) elif field == _ENCODING_HEADER: encoding = value elif field == _MERGETAG_HEADER: mergetag.append(Tag.from_string(value + b'\n')) elif field == _GPGSIG_HEADER: gpgsig = value elif field is None: message = value else: extra.append((field, value)) return (tree, parents, author_info, commit_info, encoding, mergetag, gpgsig, message, extra) class Commit(ShaFile): """A git commit object""" type_name = b'commit' type_num = 1 __slots__ = ('_parents', '_encoding', '_extra', '_author_timezone_neg_utc', '_commit_timezone_neg_utc', '_commit_time', '_author_time', '_author_timezone', '_commit_timezone', '_author', '_committer', '_tree', '_message', '_mergetag', '_gpgsig') def __init__(self): super(Commit, self).__init__() self._parents = [] self._encoding = None self._mergetag = [] self._gpgsig = None self._extra = [] self._author_timezone_neg_utc = False self._commit_timezone_neg_utc = False @classmethod def from_path(cls, path): commit = ShaFile.from_path(path) if not isinstance(commit, cls): raise NotCommitError(path) return commit def _deserialize(self, chunks): (self._tree, self._parents, author_info, commit_info, self._encoding, self._mergetag, self._gpgsig, self._message, self._extra) = ( parse_commit(chunks)) (self._author, self._author_time, (self._author_timezone, self._author_timezone_neg_utc)) = author_info (self._committer, self._commit_time, (self._commit_timezone, self._commit_timezone_neg_utc)) = commit_info def check(self): """Check this object for internal consistency. Raises: ObjectFormatException: if the object is malformed in some way """ super(Commit, self).check() self._check_has_member("_tree", "missing tree") self._check_has_member("_author", "missing author") self._check_has_member("_committer", "missing committer") self._check_has_member("_author_time", "missing author time") self._check_has_member("_commit_time", "missing commit time") for parent in self._parents: check_hexsha(parent, "invalid parent sha") check_hexsha(self._tree, "invalid tree sha") check_identity(self._author, "invalid author") check_identity(self._committer, "invalid committer") check_time(self._author_time) check_time(self._commit_time) last = None for field, _ in _parse_message(self._chunked_text): if field == _TREE_HEADER and last is not None: raise ObjectFormatException("unexpected tree") elif field == _PARENT_HEADER and last not in (_PARENT_HEADER, _TREE_HEADER): raise ObjectFormatException("unexpected parent") elif field == _AUTHOR_HEADER and last not in (_TREE_HEADER, _PARENT_HEADER): raise ObjectFormatException("unexpected author") elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER: raise ObjectFormatException("unexpected committer") elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER: raise ObjectFormatException("unexpected encoding") last = field # TODO: optionally check for duplicate parents def _serialize(self): chunks = [] tree_bytes = ( self._tree.id if isinstance(self._tree, Tree) else self._tree) chunks.append(git_line(_TREE_HEADER, tree_bytes)) for p in self._parents: chunks.append(git_line(_PARENT_HEADER, p)) chunks.append(git_line( _AUTHOR_HEADER, self._author, str(self._author_time).encode('ascii'), format_timezone( self._author_timezone, self._author_timezone_neg_utc))) chunks.append(git_line( _COMMITTER_HEADER, self._committer, str(self._commit_time).encode('ascii'), format_timezone(self._commit_timezone, self._commit_timezone_neg_utc))) if self.encoding: chunks.append(git_line(_ENCODING_HEADER, self.encoding)) for mergetag in self.mergetag: mergetag_chunks = mergetag.as_raw_string().split(b'\n') chunks.append(git_line(_MERGETAG_HEADER, mergetag_chunks[0])) # Embedded extra header needs leading space for chunk in mergetag_chunks[1:]: chunks.append(b' ' + chunk + b'\n') # No trailing empty line if chunks[-1].endswith(b' \n'): chunks[-1] = chunks[-1][:-2] for k, v in self.extra: if b'\n' in k or b'\n' in v: raise AssertionError( "newline in extra data: %r -> %r" % (k, v)) chunks.append(git_line(k, v)) if self.gpgsig: sig_chunks = self.gpgsig.split(b'\n') chunks.append(git_line(_GPGSIG_HEADER, sig_chunks[0])) for chunk in sig_chunks[1:]: chunks.append(git_line(b'', chunk)) chunks.append(b'\n') # There must be a new line after the headers chunks.append(self._message) return chunks tree = serializable_property( "tree", "Tree that is the state of this commit") def _get_parents(self): """Return a list of parents of this commit.""" return self._parents def _set_parents(self, value): """Set a list of parents of this commit.""" self._needs_serialization = True self._parents = value parents = property(_get_parents, _set_parents, doc="Parents of this commit, by their SHA1.") def _get_extra(self): """Return extra settings of this commit.""" return self._extra extra = property( _get_extra, doc="Extra header fields not understood (presumably added in a " "newer version of git). Kept verbatim so the object can " "be correctly reserialized. For private commit metadata, use " "pseudo-headers in Commit.message, rather than this field.") author = serializable_property( "author", "The name of the author of the commit") committer = serializable_property( "committer", "The name of the committer of the commit") message = serializable_property( "message", "The commit message") commit_time = serializable_property( "commit_time", "The timestamp of the commit. As the number of seconds since the " "epoch.") commit_timezone = serializable_property( "commit_timezone", "The zone the commit time is in") author_time = serializable_property( "author_time", "The timestamp the commit was written. As the number of " "seconds since the epoch.") author_timezone = serializable_property( "author_timezone", "Returns the zone the author time is in.") encoding = serializable_property( "encoding", "Encoding of the commit message.") mergetag = serializable_property( "mergetag", "Associated signed tag.") gpgsig = serializable_property( "gpgsig", "GPG Signature.") OBJECT_CLASSES = ( Commit, Tree, Blob, Tag, ) -_TYPE_MAP:Dict[Union[bytes,int],Type[ShaFile]] = {} +_TYPE_MAP: Dict[Union[bytes, int], Type[ShaFile]] = {} for cls in OBJECT_CLASSES: _TYPE_MAP[cls.type_name] = cls _TYPE_MAP[cls.type_num] = cls # Hold on to the pure-python implementations for testing _parse_tree_py = parse_tree _sorted_tree_items_py = sorted_tree_items try: # Try to import C versions from dulwich._objects import parse_tree, sorted_tree_items # type: ignore except ImportError: pass diff --git a/dulwich/tests/compat/test_web.py b/dulwich/tests/compat/test_web.py index 393e9f9e..d99daa9f 100644 --- a/dulwich/tests/compat/test_web.py +++ b/dulwich/tests/compat/test_web.py @@ -1,207 +1,207 @@ # test_web.py -- Compatibility tests for the git web server. # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Compatibility tests between Dulwich and the cgit HTTP server. warning: these tests should be fairly stable, but when writing/debugging new tests, deadlocks may freeze the test process such that it cannot be Ctrl-C'ed. On POSIX systems, you can kill the tests with Ctrl-Z, "kill %". """ import threading from wsgiref import simple_server import sys from typing import Tuple from dulwich.server import ( DictBackend, UploadPackHandler, ReceivePackHandler, ) from dulwich.tests import ( SkipTest, skipIf, ) from dulwich.web import ( make_wsgi_chain, HTTPGitApplication, WSGIRequestHandlerLogger, WSGIServerLogger, ) from dulwich.tests.compat.server_utils import ( ServerTests, NoSideBand64kReceivePackHandler, ) from dulwich.tests.compat.utils import ( CompatTestCase, ) @skipIf(sys.platform == 'win32', 'Broken on windows, with very long fail time.') class WebTests(ServerTests): """Base tests for web server tests. Contains utility and setUp/tearDown methods, but does non inherit from TestCase so tests are not automatically run. """ protocol = 'http' def _start_server(self, repo): backend = DictBackend({'/': repo}) app = self._make_app(backend) dul_server = simple_server.make_server( 'localhost', 0, app, server_class=WSGIServerLogger, handler_class=WSGIRequestHandlerLogger) self.addCleanup(dul_server.shutdown) self.addCleanup(dul_server.server_close) threading.Thread(target=dul_server.serve_forever).start() self._server = dul_server _, port = dul_server.socket.getsockname() return port @skipIf(sys.platform == 'win32', 'Broken on windows, with very long fail time.') class SmartWebTestCase(WebTests, CompatTestCase): """Test cases for smart HTTP server. This server test case does not use side-band-64k in git-receive-pack. """ - min_git_version:Tuple[int, ...] = (1, 6, 6) + min_git_version: Tuple[int, ...] = (1, 6, 6) def _handlers(self): return {b'git-receive-pack': NoSideBand64kReceivePackHandler} def _check_app(self, app): receive_pack_handler_cls = app.handlers[b'git-receive-pack'] caps = receive_pack_handler_cls.capabilities() self.assertNotIn(b'side-band-64k', caps) def _make_app(self, backend): app = make_wsgi_chain(backend, handlers=self._handlers()) to_check = app # peel back layers until we're at the base application while not issubclass(to_check.__class__, HTTPGitApplication): to_check = to_check.app self._check_app(to_check) return app def patch_capabilities(handler, caps_removed): # Patch a handler's capabilities by specifying a list of them to be # removed, and return the original classmethod for restoration. original_capabilities = handler.capabilities filtered_capabilities = [ i for i in original_capabilities() if i not in caps_removed] def capabilities(cls): return filtered_capabilities handler.capabilities = classmethod(capabilities) return original_capabilities @skipIf(sys.platform == 'win32', 'Broken on windows, with very long fail time.') class SmartWebSideBand64kTestCase(SmartWebTestCase): """Test cases for smart HTTP server with side-band-64k support.""" # side-band-64k in git-receive-pack was introduced in git 1.7.0.2 min_git_version = (1, 7, 0, 2) def setUp(self): self.o_uph_cap = patch_capabilities(UploadPackHandler, (b"no-done",)) self.o_rph_cap = patch_capabilities(ReceivePackHandler, (b"no-done",)) super(SmartWebSideBand64kTestCase, self).setUp() def tearDown(self): super(SmartWebSideBand64kTestCase, self).tearDown() UploadPackHandler.capabilities = self.o_uph_cap ReceivePackHandler.capabilities = self.o_rph_cap def _handlers(self): return None # default handlers include side-band-64k def _check_app(self, app): receive_pack_handler_cls = app.handlers[b'git-receive-pack'] caps = receive_pack_handler_cls.capabilities() self.assertIn(b'side-band-64k', caps) self.assertNotIn(b'no-done', caps) class SmartWebSideBand64kNoDoneTestCase(SmartWebTestCase): """Test cases for smart HTTP server with side-band-64k and no-done support. """ # no-done was introduced in git 1.7.4 min_git_version = (1, 7, 4) def _handlers(self): return None # default handlers include side-band-64k def _check_app(self, app): receive_pack_handler_cls = app.handlers[b'git-receive-pack'] caps = receive_pack_handler_cls.capabilities() self.assertIn(b'side-band-64k', caps) self.assertIn(b'no-done', caps) @skipIf(sys.platform == 'win32', 'Broken on windows, with very long fail time.') class DumbWebTestCase(WebTests, CompatTestCase): """Test cases for dumb HTTP server.""" def _make_app(self, backend): return make_wsgi_chain(backend, dumb=True) def test_push_to_dulwich(self): # Note: remove this if dulwich implements dumb web pushing. raise SkipTest('Dumb web pushing not supported.') def test_push_to_dulwich_remove_branch(self): # Note: remove this if dumb pushing is supported raise SkipTest('Dumb web pushing not supported.') def test_new_shallow_clone_from_dulwich(self): # Note: remove this if C git and dulwich implement dumb web shallow # clones. raise SkipTest('Dumb web shallow cloning not supported.') def test_shallow_clone_from_git_is_identical(self): # Note: remove this if C git and dulwich implement dumb web shallow # clones. raise SkipTest('Dumb web shallow cloning not supported.') def test_fetch_same_depth_into_shallow_clone_from_dulwich(self): # Note: remove this if C git and dulwich implement dumb web shallow # clones. raise SkipTest('Dumb web shallow cloning not supported.') def test_fetch_full_depth_into_shallow_clone_from_dulwich(self): # Note: remove this if C git and dulwich implement dumb web shallow # clones. raise SkipTest('Dumb web shallow cloning not supported.') def test_push_to_dulwich_issue_88_standard(self): raise SkipTest('Dumb web pushing not supported.') diff --git a/dulwich/tests/compat/utils.py b/dulwich/tests/compat/utils.py index 4173dbb4..29bda400 100644 --- a/dulwich/tests/compat/utils.py +++ b/dulwich/tests/compat/utils.py @@ -1,265 +1,265 @@ # utils.py -- Git compatibility utilities # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Utilities for interacting with cgit.""" import errno import functools import os import shutil import socket import stat import subprocess import sys import tempfile import time from typing import Tuple from dulwich.repo import Repo from dulwich.protocol import TCP_GIT_PORT from dulwich.tests import ( SkipTest, TestCase, ) _DEFAULT_GIT = 'git' _VERSION_LEN = 4 _REPOS_DATA_DIR = os.path.abspath(os.path.join( os.path.dirname(__file__), os.pardir, 'data', 'repos')) def git_version(git_path=_DEFAULT_GIT): """Attempt to determine the version of git currently installed. Args: git_path: Path to the git executable; defaults to the version in the system path. Returns: A tuple of ints of the form (major, minor, point, sub-point), or None if no git installation was found. """ try: output = run_git_or_fail(['--version'], git_path=git_path) except OSError: return None version_prefix = b'git version ' if not output.startswith(version_prefix): return None parts = output[len(version_prefix):].split(b'.') nums = [] for part in parts: try: nums.append(int(part)) except ValueError: break while len(nums) < _VERSION_LEN: nums.append(0) return tuple(nums[:_VERSION_LEN]) def require_git_version(required_version, git_path=_DEFAULT_GIT): """Require git version >= version, or skip the calling test. Args: required_version: A tuple of ints of the form (major, minor, point, sub-point); ommitted components default to 0. git_path: Path to the git executable; defaults to the version in the system path. Raises: ValueError: if the required version tuple has too many parts. SkipTest: if no suitable git version was found at the given path. """ found_version = git_version(git_path=git_path) if found_version is None: raise SkipTest('Test requires git >= %s, but c git not found' % (required_version, )) if len(required_version) > _VERSION_LEN: raise ValueError('Invalid version tuple %s, expected %i parts' % (required_version, _VERSION_LEN)) required_version = list(required_version) while len(found_version) < len(required_version): required_version.append(0) required_version = tuple(required_version) if found_version < required_version: required_version = '.'.join(map(str, required_version)) found_version = '.'.join(map(str, found_version)) raise SkipTest('Test requires git >= %s, found %s' % (required_version, found_version)) def run_git(args, git_path=_DEFAULT_GIT, input=None, capture_stdout=False, **popen_kwargs): """Run a git command. Input is piped from the input parameter and output is sent to the standard streams, unless capture_stdout is set. Args: args: A list of args to the git command. git_path: Path to to the git executable. input: Input data to be sent to stdin. capture_stdout: Whether to capture and return stdout. popen_kwargs: Additional kwargs for subprocess.Popen; stdin/stdout args are ignored. Returns: A tuple of (returncode, stdout contents). If capture_stdout is False, None will be returned as stdout contents. Raises: OSError: if the git executable was not found. """ env = popen_kwargs.pop('env', {}) env['LC_ALL'] = env['LANG'] = 'C' args = [git_path] + args popen_kwargs['stdin'] = subprocess.PIPE if capture_stdout: popen_kwargs['stdout'] = subprocess.PIPE else: popen_kwargs.pop('stdout', None) p = subprocess.Popen(args, env=env, **popen_kwargs) stdout, stderr = p.communicate(input=input) return (p.returncode, stdout) def run_git_or_fail(args, git_path=_DEFAULT_GIT, input=None, **popen_kwargs): """Run a git command, capture stdout/stderr, and fail if git fails.""" if 'stderr' not in popen_kwargs: popen_kwargs['stderr'] = subprocess.STDOUT returncode, stdout = run_git(args, git_path=git_path, input=input, capture_stdout=True, **popen_kwargs) if returncode != 0: raise AssertionError("git with args %r failed with %d: %r" % ( args, returncode, stdout)) return stdout def import_repo_to_dir(name): """Import a repo from a fast-export file in a temporary directory. These are used rather than binary repos for compat tests because they are more compact and human-editable, and we already depend on git. Args: name: The name of the repository export file, relative to dulwich/tests/data/repos. Returns: The path to the imported repository. """ temp_dir = tempfile.mkdtemp() export_path = os.path.join(_REPOS_DATA_DIR, name) temp_repo_dir = os.path.join(temp_dir, name) export_file = open(export_path, 'rb') run_git_or_fail(['init', '--quiet', '--bare', temp_repo_dir]) run_git_or_fail(['fast-import'], input=export_file.read(), cwd=temp_repo_dir) export_file.close() return temp_repo_dir def check_for_daemon(limit=10, delay=0.1, timeout=0.1, port=TCP_GIT_PORT): """Check for a running TCP daemon. Defaults to checking 10 times with a delay of 0.1 sec between tries. Args: limit: Number of attempts before deciding no daemon is running. delay: Delay between connection attempts. timeout: Socket timeout for connection attempts. port: Port on which we expect the daemon to appear. Returns: A boolean, true if a daemon is running on the specified port, false if not. """ for _ in range(limit): time.sleep(delay) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(delay) try: s.connect(('localhost', port)) return True except socket.timeout: pass except socket.error as e: if getattr(e, 'errno', False) and e.errno != errno.ECONNREFUSED: raise elif e.args[0] != errno.ECONNREFUSED: raise finally: s.close() return False class CompatTestCase(TestCase): """Test case that requires git for compatibility checks. Subclasses can change the git version required by overriding min_git_version. """ - min_git_version:Tuple[int, ...] = (1, 5, 0) + min_git_version: Tuple[int, ...] = (1, 5, 0) def setUp(self): super(CompatTestCase, self).setUp() require_git_version(self.min_git_version) def assertObjectStoreEqual(self, store1, store2): self.assertEqual(sorted(set(store1)), sorted(set(store2))) def assertReposEqual(self, repo1, repo2): self.assertEqual(repo1.get_refs(), repo2.get_refs()) self.assertObjectStoreEqual(repo1.object_store, repo2.object_store) def assertReposNotEqual(self, repo1, repo2): refs1 = repo1.get_refs() objs1 = set(repo1.object_store) refs2 = repo2.get_refs() objs2 = set(repo2.object_store) self.assertFalse(refs1 == refs2 and objs1 == objs2) def import_repo(self, name): """Import a repo from a fast-export file in a temporary directory. Args: name: The name of the repository export file, relative to dulwich/tests/data/repos. Returns: An initialized Repo object that lives in a temporary directory. """ path = import_repo_to_dir(name) repo = Repo(path) def cleanup(): repo.close() rmtree_ro(os.path.dirname(path.rstrip(os.sep))) self.addCleanup(cleanup) return repo if sys.platform == 'win32': def remove_ro(action, name, exc): os.chmod(name, stat.S_IWRITE) os.remove(name) rmtree_ro = functools.partial(shutil.rmtree, onerror=remove_ro) else: rmtree_ro = shutil.rmtree diff --git a/dulwich/tests/test_web.py b/dulwich/tests/test_web.py index ec2d1042..fd079eda 100644 --- a/dulwich/tests/test_web.py +++ b/dulwich/tests/test_web.py @@ -1,573 +1,573 @@ # test_web.py -- Tests for the git HTTP server # Copyright (C) 2010 Google, Inc. # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for the Git HTTP server.""" from io import BytesIO import gzip import re import os from typing import Type from dulwich.object_store import ( MemoryObjectStore, ) from dulwich.objects import ( Blob, ) from dulwich.repo import ( BaseRepo, MemoryRepo, ) from dulwich.server import ( DictBackend, ) from dulwich.tests import ( TestCase, ) from dulwich.web import ( HTTP_OK, HTTP_NOT_FOUND, HTTP_FORBIDDEN, HTTP_ERROR, GunzipFilter, send_file, get_text_file, get_loose_object, get_pack_file, get_idx_file, get_info_refs, get_info_packs, handle_service_request, _LengthLimitedFile, HTTPGitRequest, HTTPGitApplication, ) from dulwich.tests.utils import ( make_object, make_tag, ) class MinimalistWSGIInputStream(object): """WSGI input stream with no 'seek()' and 'tell()' methods.""" def __init__(self, data): self.data = data self.pos = 0 def read(self, howmuch): start = self.pos end = self.pos + howmuch if start >= len(self.data): return '' self.pos = end return self.data[start:end] class MinimalistWSGIInputStream2(MinimalistWSGIInputStream): """WSGI input stream with no *working* 'seek()' and 'tell()' methods.""" def seek(self, pos): raise NotImplementedError def tell(self): raise NotImplementedError class TestHTTPGitRequest(HTTPGitRequest): """HTTPGitRequest with overridden methods to help test caching.""" def __init__(self, *args, **kwargs): HTTPGitRequest.__init__(self, *args, **kwargs) self.cached = None def nocache(self): self.cached = False def cache_forever(self): self.cached = True class WebTestCase(TestCase): """Base TestCase with useful instance vars and utility functions.""" - _req_class:Type[HTTPGitRequest] = TestHTTPGitRequest + _req_class: Type[HTTPGitRequest] = TestHTTPGitRequest def setUp(self): super(WebTestCase, self).setUp() self._environ = {} self._req = self._req_class(self._environ, self._start_response, handlers=self._handlers()) self._status = None self._headers = [] self._output = BytesIO() def _start_response(self, status, headers): self._status = status self._headers = list(headers) return self._output.write def _handlers(self): return None def assertContentTypeEquals(self, expected): self.assertTrue(('Content-Type', expected) in self._headers) def _test_backend(objects, refs=None, named_files=None): if not refs: refs = {} if not named_files: named_files = {} repo = MemoryRepo.init_bare(objects, refs) for path, contents in named_files.items(): repo._put_named_file(path, contents) return DictBackend({'/': repo}) class DumbHandlersTestCase(WebTestCase): def test_send_file_not_found(self): list(send_file(self._req, None, 'text/plain')) self.assertEqual(HTTP_NOT_FOUND, self._status) def test_send_file(self): f = BytesIO(b'foobar') output = b''.join(send_file(self._req, f, 'some/thing')) self.assertEqual(b'foobar', output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('some/thing') self.assertTrue(f.closed) def test_send_file_buffered(self): bufsize = 10240 xs = b'x' * bufsize f = BytesIO(2 * xs) self.assertEqual([xs, xs], list(send_file(self._req, f, 'some/thing'))) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('some/thing') self.assertTrue(f.closed) def test_send_file_error(self): class TestFile(object): def __init__(self, exc_class): self.closed = False self._exc_class = exc_class def read(self, size=-1): raise self._exc_class() def close(self): self.closed = True f = TestFile(IOError) list(send_file(self._req, f, 'some/thing')) self.assertEqual(HTTP_ERROR, self._status) self.assertTrue(f.closed) self.assertFalse(self._req.cached) # non-IOErrors are reraised f = TestFile(AttributeError) self.assertRaises(AttributeError, list, send_file(self._req, f, 'some/thing')) self.assertTrue(f.closed) self.assertFalse(self._req.cached) def test_get_text_file(self): backend = _test_backend([], named_files={'description': b'foo'}) mat = re.search('.*', 'description') output = b''.join(get_text_file(self._req, backend, mat)) self.assertEqual(b'foo', output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('text/plain') self.assertFalse(self._req.cached) def test_get_loose_object(self): blob = make_object(Blob, data=b'foo') backend = _test_backend([blob]) mat = re.search('^(..)(.{38})$', blob.id.decode('ascii')) output = b''.join(get_loose_object(self._req, backend, mat)) self.assertEqual(blob.as_legacy_object(), output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('application/x-git-loose-object') self.assertTrue(self._req.cached) def test_get_loose_object_missing(self): mat = re.search('^(..)(.{38})$', '1' * 40) list(get_loose_object(self._req, _test_backend([]), mat)) self.assertEqual(HTTP_NOT_FOUND, self._status) def test_get_loose_object_error(self): blob = make_object(Blob, data=b'foo') backend = _test_backend([blob]) mat = re.search('^(..)(.{38})$', blob.id.decode('ascii')) def as_legacy_object_error(self): raise IOError self.addCleanup( setattr, Blob, 'as_legacy_object', Blob.as_legacy_object) Blob.as_legacy_object = as_legacy_object_error list(get_loose_object(self._req, backend, mat)) self.assertEqual(HTTP_ERROR, self._status) def test_get_pack_file(self): pack_name = os.path.join( 'objects', 'pack', 'pack-%s.pack' % ('1' * 40)) backend = _test_backend([], named_files={pack_name: b'pack contents'}) mat = re.search('.*', pack_name) output = b''.join(get_pack_file(self._req, backend, mat)) self.assertEqual(b'pack contents', output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('application/x-git-packed-objects') self.assertTrue(self._req.cached) def test_get_idx_file(self): idx_name = os.path.join('objects', 'pack', 'pack-%s.idx' % ('1' * 40)) backend = _test_backend([], named_files={idx_name: b'idx contents'}) mat = re.search('.*', idx_name) output = b''.join(get_idx_file(self._req, backend, mat)) self.assertEqual(b'idx contents', output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('application/x-git-packed-objects-toc') self.assertTrue(self._req.cached) def test_get_info_refs(self): self._environ['QUERY_STRING'] = '' blob1 = make_object(Blob, data=b'1') blob2 = make_object(Blob, data=b'2') blob3 = make_object(Blob, data=b'3') tag1 = make_tag(blob2, name=b'tag-tag') objects = [blob1, blob2, blob3, tag1] refs = { b'HEAD': b'000', b'refs/heads/master': blob1.id, b'refs/tags/tag-tag': tag1.id, b'refs/tags/blob-tag': blob3.id, } backend = _test_backend(objects, refs=refs) mat = re.search('.*', '//info/refs') self.assertEqual([blob1.id + b'\trefs/heads/master\n', blob3.id + b'\trefs/tags/blob-tag\n', tag1.id + b'\trefs/tags/tag-tag\n', blob2.id + b'\trefs/tags/tag-tag^{}\n'], list(get_info_refs(self._req, backend, mat))) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('text/plain') self.assertFalse(self._req.cached) def test_get_info_refs_not_found(self): self._environ['QUERY_STRING'] = '' objects = [] refs = {} backend = _test_backend(objects, refs=refs) mat = re.search('info/refs', '/foo/info/refs') self.assertEqual( [b'No git repository was found at /foo'], list(get_info_refs(self._req, backend, mat))) self.assertEqual(HTTP_NOT_FOUND, self._status) self.assertContentTypeEquals('text/plain') def test_get_info_packs(self): class TestPackData(object): def __init__(self, sha): self.filename = "pack-%s.pack" % sha class TestPack(object): def __init__(self, sha): self.data = TestPackData(sha) packs = [TestPack(str(i) * 40) for i in range(1, 4)] class TestObjectStore(MemoryObjectStore): # property must be overridden, can't be assigned @property def packs(self): return packs store = TestObjectStore() repo = BaseRepo(store, None) backend = DictBackend({'/': repo}) mat = re.search('.*', '//info/packs') output = b''.join(get_info_packs(self._req, backend, mat)) expected = b''.join( [(b'P pack-' + s + b'.pack\n') for s in [b'1' * 40, b'2' * 40, b'3' * 40]]) self.assertEqual(expected, output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals('text/plain') self.assertFalse(self._req.cached) class SmartHandlersTestCase(WebTestCase): class _TestUploadPackHandler(object): def __init__(self, backend, args, proto, http_req=None, advertise_refs=False): self.args = args self.proto = proto self.http_req = http_req self.advertise_refs = advertise_refs def handle(self): self.proto.write(b'handled input: ' + self.proto.recv(1024)) def _make_handler(self, *args, **kwargs): self._handler = self._TestUploadPackHandler(*args, **kwargs) return self._handler def _handlers(self): return {b'git-upload-pack': self._make_handler} def test_handle_service_request_unknown(self): mat = re.search('.*', '/git-evil-handler') content = list(handle_service_request(self._req, 'backend', mat)) self.assertEqual(HTTP_FORBIDDEN, self._status) self.assertFalse(b'git-evil-handler' in b"".join(content)) self.assertFalse(self._req.cached) def _run_handle_service_request(self, content_length=None): self._environ['wsgi.input'] = BytesIO(b'foo') if content_length is not None: self._environ['CONTENT_LENGTH'] = content_length mat = re.search('.*', '/git-upload-pack') class Backend(object): def open_repository(self, path): return None handler_output = b''.join( handle_service_request(self._req, Backend(), mat)) write_output = self._output.getvalue() # Ensure all output was written via the write callback. self.assertEqual(b'', handler_output) self.assertEqual(b'handled input: foo', write_output) self.assertContentTypeEquals('application/x-git-upload-pack-result') self.assertFalse(self._handler.advertise_refs) self.assertTrue(self._handler.http_req) self.assertFalse(self._req.cached) def test_handle_service_request(self): self._run_handle_service_request() def test_handle_service_request_with_length(self): self._run_handle_service_request(content_length='3') def test_handle_service_request_empty_length(self): self._run_handle_service_request(content_length='') def test_get_info_refs_unknown(self): self._environ['QUERY_STRING'] = 'service=git-evil-handler' class Backend(object): def open_repository(self, url): return None mat = re.search('.*', '/git-evil-pack') content = list(get_info_refs(self._req, Backend(), mat)) self.assertFalse(b'git-evil-handler' in b"".join(content)) self.assertEqual(HTTP_FORBIDDEN, self._status) self.assertFalse(self._req.cached) def test_get_info_refs(self): self._environ['wsgi.input'] = BytesIO(b'foo') self._environ['QUERY_STRING'] = 'service=git-upload-pack' class Backend(object): def open_repository(self, url): return None mat = re.search('.*', '/git-upload-pack') handler_output = b''.join(get_info_refs(self._req, Backend(), mat)) write_output = self._output.getvalue() self.assertEqual((b'001e# service=git-upload-pack\n' b'0000' # input is ignored by the handler b'handled input: '), write_output) # Ensure all output was written via the write callback. self.assertEqual(b'', handler_output) self.assertTrue(self._handler.advertise_refs) self.assertTrue(self._handler.http_req) self.assertFalse(self._req.cached) class LengthLimitedFileTestCase(TestCase): def test_no_cutoff(self): f = _LengthLimitedFile(BytesIO(b'foobar'), 1024) self.assertEqual(b'foobar', f.read()) def test_cutoff(self): f = _LengthLimitedFile(BytesIO(b'foobar'), 3) self.assertEqual(b'foo', f.read()) self.assertEqual(b'', f.read()) def test_multiple_reads(self): f = _LengthLimitedFile(BytesIO(b'foobar'), 3) self.assertEqual(b'fo', f.read(2)) self.assertEqual(b'o', f.read(2)) self.assertEqual(b'', f.read()) class HTTPGitRequestTestCase(WebTestCase): # This class tests the contents of the actual cache headers _req_class = HTTPGitRequest def test_not_found(self): self._req.cache_forever() # cache headers should be discarded message = 'Something not found' self.assertEqual(message.encode('ascii'), self._req.not_found(message)) self.assertEqual(HTTP_NOT_FOUND, self._status) self.assertEqual(set([('Content-Type', 'text/plain')]), set(self._headers)) def test_forbidden(self): self._req.cache_forever() # cache headers should be discarded message = 'Something not found' self.assertEqual(message.encode('ascii'), self._req.forbidden(message)) self.assertEqual(HTTP_FORBIDDEN, self._status) self.assertEqual(set([('Content-Type', 'text/plain')]), set(self._headers)) def test_respond_ok(self): self._req.respond() self.assertEqual([], self._headers) self.assertEqual(HTTP_OK, self._status) def test_respond(self): self._req.nocache() self._req.respond(status=402, content_type='some/type', headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')]) self.assertEqual(set([ ('X-Foo', 'foo'), ('X-Bar', 'bar'), ('Content-Type', 'some/type'), ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'), ('Pragma', 'no-cache'), ('Cache-Control', 'no-cache, max-age=0, must-revalidate'), ]), set(self._headers)) self.assertEqual(402, self._status) class HTTPGitApplicationTestCase(TestCase): def setUp(self): super(HTTPGitApplicationTestCase, self).setUp() self._app = HTTPGitApplication('backend') self._environ = { 'PATH_INFO': '/foo', 'REQUEST_METHOD': 'GET', } def _test_handler(self, req, backend, mat): # tests interface used by all handlers self.assertEqual(self._environ, req.environ) self.assertEqual('backend', backend) self.assertEqual('/foo', mat.group(0)) return 'output' def _add_handler(self, app): req = self._environ['REQUEST_METHOD'] app.services = { (req, re.compile('/foo$')): self._test_handler, } def test_call(self): self._add_handler(self._app) self.assertEqual('output', self._app(self._environ, None)) def test_fallback_app(self): def test_app(environ, start_response): return 'output' app = HTTPGitApplication('backend', fallback_app=test_app) self.assertEqual('output', app(self._environ, None)) class GunzipTestCase(HTTPGitApplicationTestCase): __doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input is correctly decompressed and headers are corrected. """ example_text = __doc__.encode('ascii') def setUp(self): super(GunzipTestCase, self).setUp() self._app = GunzipFilter(self._app) self._environ['HTTP_CONTENT_ENCODING'] = 'gzip' self._environ['REQUEST_METHOD'] = 'POST' def _get_zstream(self, text): zstream = BytesIO() zfile = gzip.GzipFile(fileobj=zstream, mode='w') zfile.write(text) zfile.close() zlength = zstream.tell() zstream.seek(0) return zstream, zlength def _test_call(self, orig, zstream, zlength): self._add_handler(self._app.app) self.assertLess(zlength, len(orig)) self.assertEqual(self._environ['HTTP_CONTENT_ENCODING'], 'gzip') self._environ['CONTENT_LENGTH'] = zlength self._environ['wsgi.input'] = zstream self._app(self._environ, None) buf = self._environ['wsgi.input'] self.assertIsNot(buf, zstream) buf.seek(0) self.assertEqual(orig, buf.read()) self.assertIs(None, self._environ.get('CONTENT_LENGTH')) self.assertNotIn('HTTP_CONTENT_ENCODING', self._environ) def test_call(self): self._test_call( self.example_text, *self._get_zstream(self.example_text) ) def test_call_no_seek(self): """ This ensures that the gunzipping code doesn't require any methods on 'wsgi.input' except for '.read()'. (In particular, it shouldn't require '.seek()'. See https://github.com/jelmer/dulwich/issues/140.) """ zstream, zlength = self._get_zstream(self.example_text) self._test_call( self.example_text, MinimalistWSGIInputStream(zstream.read()), zlength) def test_call_no_working_seek(self): """ Similar to 'test_call_no_seek', but this time the methods are available (but defunct). See https://github.com/jonashaag/klaus/issues/154. """ zstream, zlength = self._get_zstream(self.example_text) self._test_call( self.example_text, MinimalistWSGIInputStream2(zstream.read()), zlength)