diff --git a/dulwich/config.py b/dulwich/config.py index e2855214..bfd628eb 100644 --- a/dulwich/config.py +++ b/dulwich/config.py @@ -1,465 +1,471 @@ # config.py - Reading and writing Git config files # Copyright (C) 2011-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Reading and writing Git configuration files. TODO: * preserve formatting when updating configuration files * treat subsection names as case-insensitive for [branch.foo] style subsections """ import errno import os from collections import ( OrderedDict, MutableMapping, ) from dulwich.file import GitFile DEFAULT_ENCODING = 'utf-8' class Config(object): """A Git configuration.""" def get(self, section, name): """Retrieve the contents of a configuration setting. :param section: Tuple with section name and optional subsection namee :param subsection: Subsection name :return: Contents of the setting :raise KeyError: if the value is not set """ raise NotImplementedError(self.get) def get_boolean(self, section, name, default=None): """Retrieve a configuration setting as boolean. :param section: Tuple with section name and optional subsection namee :param name: Name of the setting, including section and possible subsection. :return: Contents of the setting :raise KeyError: if the value is not set """ try: value = self.get(section, name) except KeyError: return default if value.lower() == b"true": return True elif value.lower() == b"false": return False raise ValueError("not a valid boolean string: %r" % value) def set(self, section, name, value): """Set a configuration value. :param section: Tuple with section name and optional subsection namee :param name: Name of the configuration value, including section and optional subsection :param: Value of the setting """ raise NotImplementedError(self.set) def iteritems(self, section): """Iterate over the configuration pairs for a specific section. :param section: Tuple with section name and optional subsection namee :return: Iterator over (name, value) pairs """ raise NotImplementedError(self.iteritems) def itersections(self): """Iterate over the sections. :return: Iterator over section tuples """ raise NotImplementedError(self.itersections) def has_section(self, name): """Check if a specified section exists. :param name: Name of section to check for :return: boolean indicating whether the section exists """ return (name in self.itersections()) class ConfigDict(Config, MutableMapping): """Git configuration stored in a dictionary.""" def __init__(self, values=None): """Create a new ConfigDict.""" if values is None: values = OrderedDict() self._values = values def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._values) def __eq__(self, other): return ( isinstance(other, self.__class__) and other._values == self._values) def __getitem__(self, key): return self._values.__getitem__(key) def __setitem__(self, key, value): return self._values.__setitem__(key, value) def __delitem__(self, key): return self._values.__delitem__(key) def __iter__(self): return self._values.__iter__() def __len__(self): return self._values.__len__() @classmethod def _parse_setting(cls, name): parts = name.split(".") if len(parts) == 3: return (parts[0], parts[1], parts[2]) else: return (parts[0], None, parts[1]) def get(self, section, name): if not isinstance(section, tuple): section = (section, ) if not all([isinstance(subsection, bytes) for subsection in section]): raise TypeError(section) if not isinstance(name, bytes): raise TypeError(name) if len(section) > 1: try: return self._values[section][name] except KeyError: pass return self._values[(section[0],)][name] def set(self, section, name, value): if not isinstance(section, tuple): section = (section, ) if not isinstance(name, bytes): raise TypeError(name) if type(value) not in (bool, bytes): raise TypeError(value) self._values.setdefault(section, OrderedDict())[name] = value def iteritems(self, section): return self._values.get(section, OrderedDict()).items() def itersections(self): return self._values.keys() def _format_string(value): if (value.startswith(b" ") or value.startswith(b"\t") or value.endswith(b" ") or b'#' in value or value.endswith(b"\t")): return b'"' + _escape_value(value) + b'"' else: return _escape_value(value) _ESCAPE_TABLE = { ord(b"\\"): ord(b"\\"), ord(b"\""): ord(b"\""), ord(b"n"): ord(b"\n"), ord(b"t"): ord(b"\t"), ord(b"b"): ord(b"\b"), } _COMMENT_CHARS = [ord(b"#"), ord(b";")] _WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")] def _parse_string(value): value = bytearray(value.strip()) ret = bytearray() whitespace = bytearray() in_quotes = False i = 0 while i < len(value): c = value[i] if c == ord(b"\\"): i += 1 try: v = _ESCAPE_TABLE[value[i]] except IndexError: raise ValueError( "escape character in %r at %d before end of string" % (value, i)) except KeyError: raise ValueError( "escape character followed by unknown character " "%s at %d in %r" % (value[i], i, value)) if whitespace: ret.extend(whitespace) whitespace = bytearray() ret.append(v) elif c == ord(b"\""): in_quotes = (not in_quotes) elif c in _COMMENT_CHARS and not in_quotes: # the rest of the line is a comment break elif c in _WHITESPACE_CHARS: whitespace.append(c) else: if whitespace: ret.extend(whitespace) whitespace = bytearray() ret.append(c) i += 1 if in_quotes: raise ValueError("missing end quote") return bytes(ret) def _escape_value(value): """Escape a value.""" value = value.replace(b"\\", b"\\\\") value = value.replace(b"\n", b"\\n") value = value.replace(b"\t", b"\\t") value = value.replace(b"\"", b"\\\"") return value def _check_variable_name(name): for i in range(len(name)): c = name[i:i+1] if not c.isalnum() and c != b'-': return False return True def _check_section_name(name): for i in range(len(name)): c = name[i:i+1] if not c.isalnum() and c not in (b'-', b'.'): return False return True def _strip_comments(line): comment_bytes = {ord(b"#"), ord(b";")} quote = ord(b'"') string_open = False # Normalize line to bytearray for simple 2/3 compatibility for i, character in enumerate(bytearray(line)): # Comment characters outside balanced quotes denote comment start if character == quote: string_open = not string_open elif not string_open and character in comment_bytes: return line[:i] return line class ConfigFile(ConfigDict): """A Git configuration file, like .git/config or ~/.gitconfig. """ @classmethod def from_file(cls, f): """Read configuration from a file-like object.""" ret = cls() section = None setting = None for lineno, line in enumerate(f.readlines()): line = line.lstrip() if setting is None: # Parse section header ("[bla]") if len(line) > 0 and line[:1] == b"[": line = _strip_comments(line).rstrip() try: last = line.index(b"]") except ValueError: raise ValueError("expected trailing ]") pts = line[1:last].split(b" ", 1) line = line[last+1:] pts[0] = pts[0].lower() if len(pts) == 2: if pts[1][:1] != b"\"" or pts[1][-1:] != b"\"": raise ValueError( "Invalid subsection %r" % pts[1]) else: pts[1] = pts[1][1:-1] if not _check_section_name(pts[0]): raise ValueError("invalid section name %r" % pts[0]) section = (pts[0], pts[1]) else: if not _check_section_name(pts[0]): raise ValueError( "invalid section name %r" % pts[0]) pts = pts[0].split(b".", 1) if len(pts) == 2: section = (pts[0], pts[1]) else: section = (pts[0], ) ret._values[section] = OrderedDict() if _strip_comments(line).strip() == b"": continue if section is None: raise ValueError("setting %r without section" % line) try: setting, value = line.split(b"=", 1) except ValueError: setting = line value = b"true" setting = setting.strip().lower() if not _check_variable_name(setting): raise ValueError("invalid variable name %s" % setting) if value.endswith(b"\\\n"): continuation = value[:-2] else: continuation = None value = _parse_string(value) ret._values[section][setting] = value setting = None else: # continuation line if line.endswith(b"\\\n"): continuation += line[:-2] else: continuation += line value = _parse_string(continuation) ret._values[section][setting] = value continuation = None setting = None return ret @classmethod def from_path(cls, path): """Read configuration from a file on disk.""" with GitFile(path, 'rb') as f: ret = cls.from_file(f) ret.path = path return ret def write_to_path(self, path=None): """Write configuration to a file on disk.""" if path is None: path = self.path with GitFile(path, 'wb') as f: self.write_to_file(f) def write_to_file(self, f): """Write configuration to a file-like object.""" for section, values in self._values.items(): try: section_name, subsection_name = section except ValueError: (section_name, ) = section subsection_name = None if subsection_name is None: f.write(b"[" + section_name + b"]\n") else: f.write(b"[" + section_name + b" \"" + subsection_name + b"\"]\n") for key, value in values.items(): if value is True: value = b"true" elif value is False: value = b"false" else: value = _format_string(value) f.write(b"\t" + key + b" = " + value + b"\n") class StackedConfig(Config): """Configuration which reads from multiple config files..""" def __init__(self, backends, writable=None): self.backends = backends self.writable = writable def __repr__(self): return "<%s for %r>" % (self.__class__.__name__, self.backends) @classmethod def default(cls): return cls(cls.default_backends()) @classmethod def default_backends(cls): """Retrieve the default configuration. See git-config(1) for details on the files searched. """ paths = [] paths.append(os.path.expanduser("~/.gitconfig")) xdg_config_home = os.environ.get( "XDG_CONFIG_HOME", os.path.expanduser("~/.config/"), ) paths.append(os.path.join(xdg_config_home, "git", "config")) if "GIT_CONFIG_NOSYSTEM" not in os.environ: paths.append("/etc/gitconfig") backends = [] for path in paths: try: cf = ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise else: continue backends.append(cf) return backends def get(self, section, name): + if not isinstance(section, tuple): + section = (section, ) + if not all([isinstance(subsection, bytes) for subsection in section]): + raise TypeError(section) + if not isinstance(name, bytes): + raise TypeError(name) for backend in self.backends: try: return backend.get(section, name) except KeyError: pass raise KeyError(name) def set(self, section, name, value): if self.writable is None: raise NotImplementedError(self.set) return self.writable.set(section, name, value) def parse_submodules(config): """Parse a gitmodules GitConfig file, returning submodules. :param config: A `ConfigFile` :return: list of tuples (submodule path, url, name), where name is quoted part of the section's name. """ for section in config.keys(): section_kind, section_name = section if section_kind == b'submodule': sm_path = config.get(section, b'path') sm_url = config.get(section, b'url') yield (sm_path, sm_url, section_name) diff --git a/dulwich/ignore.py b/dulwich/ignore.py index df7277b0..b3878938 100644 --- a/dulwich/ignore.py +++ b/dulwich/ignore.py @@ -1,358 +1,358 @@ # Copyright (C) 2017 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Parsing of gitignore files. For details for the matching rules, see https://git-scm.com/docs/gitignore """ import os.path import re import sys def _translate_segment(segment): if segment == b"*": return b'[^/]+' res = b"" i, n = 0, len(segment) while i < n: c = segment[i:i+1] i = i+1 if c == b'*': res += b'[^/]*' elif c == b'?': res += b'.' elif c == b'[': j = i if j < n and segment[j:j+1] == b'!': j = j+1 if j < n and segment[j:j+1] == b']': j = j+1 while j < n and segment[j:j+1] != b']': j = j+1 if j >= n: res += b'\\[' else: stuff = segment[i:j].replace(b'\\', b'\\\\') i = j+1 if stuff.startswith(b'!'): stuff = b'^' + stuff[1:] elif stuff.startswith(b'^'): stuff = b'\\' + stuff res += b'[' + stuff + b']' else: res += re.escape(c) return res def translate(pat): """Translate a shell PATTERN to a regular expression. There is no way to quote meta-characters. Originally copied from fnmatch in Python 2.7, but modified for Dulwich to cope with features in Git ignore patterns. """ res = b'(?ms)' if b'/' not in pat[:-1]: # If there's no slash, this is a filename-based match res += b'(.*/)?' if pat.startswith(b'**/'): # Leading **/ pat = pat[2:] res += b'(.*/)?' if pat.startswith(b'/'): pat = pat[1:] for i, segment in enumerate(pat.split(b'/')): if segment == b'**': res += b'(/.*)?' continue else: res += ((re.escape(b'/') if i > 0 else b'') + _translate_segment(segment)) if not pat.endswith(b'/'): res += b'/?' return res + b'\Z' def read_ignore_patterns(f): """Read a git ignore file. :param f: File-like object to read from :return: List of patterns """ for line in f: line = line.rstrip(b"\r\n") # Ignore blank lines, they're used for readability. if not line: continue if line.startswith(b'#'): # Comment continue # Trailing spaces are ignored unless they are quoted with a backslash. while line.endswith(b' ') and not line.endswith(b'\\ '): line = line[:-1] line = line.replace(b'\\ ', b' ') yield line def match_pattern(path, pattern, ignorecase=False): """Match a gitignore-style pattern against a path. :param path: Path to match :param pattern: Pattern to match :param ignorecase: Whether to do case-sensitive matching :return: bool indicating whether the pattern matched """ return Pattern(pattern, ignorecase).match(path) class Pattern(object): """A single ignore pattern.""" def __init__(self, pattern, ignorecase=False): self.pattern = pattern self.ignorecase = ignorecase if pattern[0:1] == b'!': self.is_exclude = False pattern = pattern[1:] else: if pattern[0:1] == b'\\': pattern = pattern[1:] self.is_exclude = True flags = 0 if self.ignorecase: flags = re.IGNORECASE self._re = re.compile(translate(pattern), flags) def __bytes__(self): return self.pattern def __str__(self): return self.pattern.decode(sys.getfilesystemencoding()) def __eq__(self, other): return (type(self) == type(other) and self.pattern == other.pattern and self.ignorecase == other.ignorecase) def __repr__(self): return "%s(%s, %r)" % ( type(self).__name__, self.pattern, self.ignorecase) def match(self, path): """Try to match a path against this ignore pattern. :param path: Path to match (relative to ignore location) :return: boolean """ return bool(self._re.match(path)) class IgnoreFilter(object): def __init__(self, patterns, ignorecase=False): self._patterns = [] self._ignorecase = ignorecase for pattern in patterns: self.append_pattern(pattern) def append_pattern(self, pattern): """Add a pattern to the set.""" self._patterns.append(Pattern(pattern, self._ignorecase)) def find_matching(self, path): """Yield all matching patterns for path. :param path: Path to match :return: Iterator over iterators """ if not isinstance(path, bytes): path = path.encode(sys.getfilesystemencoding()) for pattern in self._patterns: if pattern.match(path): yield pattern def is_ignored(self, path): """Check whether a path is ignored. For directories, include a trailing slash. :return: status is None if file is not mentioned, True if it is included, False if it is explicitly excluded. """ status = None for pattern in self.find_matching(path): status = pattern.is_exclude return status @classmethod def from_path(cls, path, ignorecase=False): with open(path, 'rb') as f: ret = cls(read_ignore_patterns(f), ignorecase) ret._path = path return ret def __repr__(self): if getattr(self, '_path', None) is None: return "<%s>" % (type(self).__name__) else: return "%s.from_path(%r)" % (type(self).__name__, self._path) class IgnoreFilterStack(object): """Check for ignore status in multiple filters.""" def __init__(self, filters): self._filters = filters def is_ignored(self, path): """Check whether a path is explicitly included or excluded in ignores. :param path: Path to check :return: None if the file is not mentioned, True if it is included, False if it is explicitly excluded. """ status = None for filter in self._filters: status = filter.is_ignored(path) if status is not None: return status return status def default_user_ignore_filter_path(config): """Return default user ignore filter path. :param config: A Config object :return: Path to a global ignore file """ try: - return config.get(('core', ), 'excludesFile') + return config.get((b'core', ), b'excludesFile') except KeyError: pass xdg_config_home = os.environ.get( "XDG_CONFIG_HOME", os.path.expanduser("~/.config/"), ) return os.path.join(xdg_config_home, 'git', 'ignore') class IgnoreFilterManager(object): """Ignore file manager.""" def __init__(self, top_path, global_filters, ignorecase): self._path_filters = {} self._top_path = top_path self._global_filters = global_filters self._ignorecase = ignorecase def __repr__(self): return "%s(%s, %r, %r)" % ( type(self).__name__, self._top_path, self._global_filters, self._ignorecase) def _load_path(self, path): try: return self._path_filters[path] except KeyError: pass p = os.path.join(self._top_path, path, '.gitignore') try: self._path_filters[path] = IgnoreFilter.from_path( p, self._ignorecase) except IOError: self._path_filters[path] = None return self._path_filters[path] def find_matching(self, path): """Find matching patterns for path. Stops after the first ignore file with matches. :param path: Path to check :return: Iterator over Pattern instances """ if os.path.isabs(path): raise ValueError('%s is an absolute path' % path) filters = [(0, f) for f in self._global_filters] if os.path.sep != '/': path = path.replace(os.path.sep, '/') parts = path.split('/') for i in range(len(parts)+1): dirname = '/'.join(parts[:i]) for s, f in filters: relpath = '/'.join(parts[s:i]) if i < len(parts): # Paths leading up to the final part are all directories, # so need a trailing slash. relpath += '/' matches = list(f.find_matching(relpath)) if matches: return iter(matches) ignore_filter = self._load_path(dirname) if ignore_filter is not None: filters.insert(0, (i, ignore_filter)) return iter([]) def is_ignored(self, path): """Check whether a path is explicitly included or excluded in ignores. :param path: Path to check :return: None if the file is not mentioned, True if it is included, False if it is explicitly excluded. """ matches = list(self.find_matching(path)) if matches: return matches[-1].is_exclude return None @classmethod def from_repo(cls, repo): """Create a IgnoreFilterManager from a repository. :param repo: Repository object :return: A `IgnoreFilterManager` object """ global_filters = [] for p in [ os.path.join(repo.controldir(), 'info', 'exclude'), default_user_ignore_filter_path(repo.get_config_stack())]: try: global_filters.append(IgnoreFilter.from_path(p)) except IOError: pass config = repo.get_config_stack() ignorecase = config.get_boolean((b'core'), (b'ignorecase'), False) return cls(repo.path, global_filters, ignorecase) diff --git a/dulwich/repo.py b/dulwich/repo.py index 2ca70f96..25b6c2fc 100644 --- a/dulwich/repo.py +++ b/dulwich/repo.py @@ -1,1166 +1,1166 @@ # repo.py -- For dealing with git repositories. # Copyright (C) 2007 James Westby # Copyright (C) 2008-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Repository access. This module contains the base class for git repositories (BaseRepo) and an implementation which uses a repository on local disk (Repo). """ from io import BytesIO import errno import os import sys import stat from dulwich.errors import ( NoIndexPresent, NotBlobError, NotCommitError, NotGitRepository, NotTreeError, NotTagError, CommitError, RefFormatError, HookError, ) from dulwich.file import ( GitFile, ) from dulwich.object_store import ( DiskObjectStore, MemoryObjectStore, ObjectStoreGraphWalker, ) from dulwich.objects import ( check_hexsha, Blob, Commit, ShaFile, Tag, Tree, ) from dulwich.hooks import ( PreCommitShellHook, PostCommitShellHook, CommitMsgShellHook, ) from dulwich.refs import ( # noqa: F401 check_ref_format, RefsContainer, DictRefsContainer, InfoRefsContainer, DiskRefsContainer, read_packed_refs, read_packed_refs_with_peeled, write_packed_refs, SYMREF, ) import warnings CONTROLDIR = '.git' OBJECTDIR = 'objects' REFSDIR = 'refs' REFSDIR_TAGS = 'tags' REFSDIR_HEADS = 'heads' INDEX_FILENAME = "index" COMMONDIR = 'commondir' GITDIR = 'gitdir' WORKTREES = 'worktrees' BASE_DIRECTORIES = [ ["branches"], [REFSDIR], [REFSDIR, REFSDIR_TAGS], [REFSDIR, REFSDIR_HEADS], ["hooks"], ["info"] ] DEFAULT_REF = b'refs/heads/master' def parse_graftpoints(graftpoints): """Convert a list of graftpoints into a dict :param graftpoints: Iterator of graftpoint lines Each line is formatted as: []* Resulting dictionary is: : [*] https://git.wiki.kernel.org/index.php/GraftPoint """ grafts = {} for l in graftpoints: raw_graft = l.split(None, 1) commit = raw_graft[0] if len(raw_graft) == 2: parents = raw_graft[1].split() else: parents = [] for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') grafts[commit] = parents return grafts def serialize_graftpoints(graftpoints): """Convert a dictionary of grafts into string The graft dictionary is: : [*] Each line is formatted as: []* https://git.wiki.kernel.org/index.php/GraftPoint """ graft_lines = [] for commit, parents in graftpoints.items(): if parents: graft_lines.append(commit + b' ' + b' '.join(parents)) else: graft_lines.append(commit) return b'\n'.join(graft_lines) class BaseRepo(object): """Base class for a git repository. :ivar object_store: Dictionary-like object for accessing the objects :ivar refs: Dictionary-like object with the refs in this repository """ def __init__(self, object_store, refs): """Open a repository. This shouldn't be called directly, but rather through one of the base classes, such as MemoryRepo or Repo. :param object_store: Object store to use :param refs: Refs container to use """ self.object_store = object_store self.refs = refs self._graftpoints = {} self.hooks = {} def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ raise NotImplementedError(self._determine_file_mode) def _init_files(self, bare): """Initialize a default set of named files.""" from dulwich.config import ConfigFile self._put_named_file('description', b"Unnamed repository") f = BytesIO() cf = ConfigFile() cf.set(b"core", b"repositoryformatversion", b"0") if self._determine_file_mode(): cf.set(b"core", b"filemode", True) else: cf.set(b"core", b"filemode", False) cf.set(b"core", b"bare", bare) cf.set(b"core", b"logallrefupdates", True) cf.write_to_file(f) self._put_named_file('config', f.getvalue()) self._put_named_file(os.path.join('info', 'exclude'), b'') def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ raise NotImplementedError(self.get_named_file) def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ raise NotImplementedError(self._put_named_file) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ raise NotImplementedError(self.open_index) def fetch(self, target, determine_wants=None, progress=None): """Fetch objects into another repository. :param target: The target repository :param determine_wants: Optional function to determine what refs to fetch. :param progress: Optional progress function :return: The local refs """ if determine_wants is None: determine_wants = target.object_store.determine_wants_all target.object_store.add_objects( self.fetch_objects(determine_wants, target.get_graph_walker(), progress)) return self.get_refs() def fetch_objects(self, determine_wants, graph_walker, progress, get_tagged=None): """Fetch the missing objects required for a set of revisions. :param determine_wants: Function that takes a dictionary with heads and returns the list of heads to fetch. :param graph_walker: Object that can iterate over the list of revisions to fetch and has an "ack" method that will be called to acknowledge that a revision is present. :param progress: Simple progress function that will be called with updated progress strings. :param get_tagged: Function that returns a dict of pointed-to sha -> tag sha for including tags. :return: iterator over objects, with __len__ implemented """ wants = determine_wants(self.get_refs()) if not isinstance(wants, list): raise TypeError("determine_wants() did not return a list") shallows = getattr(graph_walker, 'shallow', frozenset()) unshallows = getattr(graph_walker, 'unshallow', frozenset()) if wants == []: # TODO(dborowitz): find a way to short-circuit that doesn't change # this interface. if shallows or unshallows: # Do not send a pack in shallow short-circuit path return None return [] # If the graph walker is set up with an implementation that can # ACK/NAK to the wire, it will write data to the client through # this call as a side-effect. haves = self.object_store.find_common_revisions(graph_walker) # Deal with shallow requests separately because the haves do # not reflect what objects are missing if shallows or unshallows: # TODO: filter the haves commits from iter_shas. the specific # commits aren't missing. haves = [] def get_parents(commit): if commit.id in shallows: return [] return self.get_parents(commit.id, commit) return self.object_store.iter_shas( self.object_store.find_missing_objects( haves, wants, progress, get_tagged, get_parents=get_parents)) def get_graph_walker(self, heads=None): """Retrieve a graph walker. A graph walker is used by a remote repository (or proxy) to find out which objects are present in this repository. :param heads: Repository heads to use (optional) :return: A graph walker object """ if heads is None: heads = self.refs.as_dict(b'refs/heads').values() return ObjectStoreGraphWalker(heads, self.get_parents) def get_refs(self): """Get dictionary with all refs. :return: A ``dict`` mapping ref names to SHA1s """ return self.refs.as_dict() def head(self): """Return the SHA1 pointed at by HEAD.""" return self.refs[b'HEAD'] def _get_object(self, sha, cls): assert len(sha) in (20, 40) ret = self.get_object(sha) if not isinstance(ret, cls): if cls is Commit: raise NotCommitError(ret) elif cls is Blob: raise NotBlobError(ret) elif cls is Tree: raise NotTreeError(ret) elif cls is Tag: raise NotTagError(ret) else: raise Exception("Type invalid: %r != %r" % ( ret.type_name, cls.type_name)) return ret def get_object(self, sha): """Retrieve the object with the specified SHA. :param sha: SHA to retrieve :return: A ShaFile object :raise KeyError: when the object can not be found """ return self.object_store[sha] def get_parents(self, sha, commit=None): """Retrieve the parents of a specific commit. If the specific commit is a graftpoint, the graft parents will be returned instead. :param sha: SHA of the commit for which to retrieve the parents :param commit: Optional commit matching the sha :return: List of parents """ try: return self._graftpoints[sha] except KeyError: if commit is None: commit = self[sha] return commit.parents def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ raise NotImplementedError(self.get_config) def get_description(self): """Retrieve the description for this repository. :return: String with the description of the repository as set by the user. """ raise NotImplementedError(self.get_description) def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ raise NotImplementedError(self.set_description) def get_config_stack(self): """Return a config stack for this repository. This stack accesses the configuration for both this repository itself (.git/config) and the global configuration, which usually lives in ~/.gitconfig. :return: `Config` instance for this repository """ from dulwich.config import StackedConfig backends = [self.get_config()] + StackedConfig.default_backends() return StackedConfig(backends, writable=backends[0]) def get_peeled(self, ref): """Get the peeled value of a ref. :param ref: The refname to peel. :return: The fully-peeled SHA1 of a tag object, after peeling all intermediate tags; if the original ref does not point to a tag, this will equal the original SHA1. """ cached = self.refs.get_peeled(ref) if cached is not None: return cached return self.object_store.peel_sha(self.refs[ref]).id def get_walker(self, include=None, *args, **kwargs): """Obtain a walker for this repository. :param include: Iterable of SHAs of commits to include along with their ancestors. Defaults to [HEAD] :param exclude: Iterable of SHAs of commits to exclude along with their ancestors, overriding includes. :param order: ORDER_* constant specifying the order of results. Anything other than ORDER_DATE may result in O(n) memory usage. :param reverse: If True, reverse the order of output, requiring O(n) memory. :param max_entries: The maximum number of entries to yield, or None for no limit. :param paths: Iterable of file or subtree paths to show entries for. :param rename_detector: diff.RenameDetector object for detecting renames. :param follow: If True, follow path across renames/copies. Forces a default rename_detector. :param since: Timestamp to list commits after. :param until: Timestamp to list commits before. :param queue_cls: A class to use for a queue of commits, supporting the iterator protocol. The constructor takes a single argument, the Walker. :return: A `Walker` object """ from dulwich.walk import Walker if include is None: include = [self.head()] if isinstance(include, str): include = [include] kwargs['get_parents'] = lambda commit: self.get_parents( commit.id, commit) return Walker(self.object_store, include, *args, **kwargs) def __getitem__(self, name): """Retrieve a Git object by SHA1 or ref. :param name: A Git object SHA1 or a ref name :return: A `ShaFile` object, such as a Commit or Blob :raise KeyError: when the specified ref or object does not exist """ if not isinstance(name, bytes): raise TypeError("'name' must be bytestring, not %.80s" % type(name).__name__) if len(name) in (20, 40): try: return self.object_store[name] except (KeyError, ValueError): pass try: return self.object_store[self.refs[name]] except RefFormatError: raise KeyError(name) def __contains__(self, name): """Check if a specific Git object or ref is present. :param name: Git object SHA1 or ref name """ if len(name) in (20, 40): return name in self.object_store or name in self.refs else: return name in self.refs def __setitem__(self, name, value): """Set a ref. :param name: ref name :param value: Ref value - either a ShaFile object, or a hex sha """ if name.startswith(b"refs/") or name == b'HEAD': if isinstance(value, ShaFile): self.refs[name] = value.id elif isinstance(value, bytes): self.refs[name] = value else: raise TypeError(value) else: raise ValueError(name) def __delitem__(self, name): """Remove a ref. :param name: Name of the ref to remove """ if name.startswith(b"refs/") or name == b"HEAD": del self.refs[name] else: raise ValueError(name) def _get_user_identity(self): """Determine the identity to use for new commits. """ config = self.get_config_stack() return (config.get((b"user", ), b"name") + b" <" + config.get((b"user", ), b"email") + b">") def _add_graftpoints(self, updated_graftpoints): """Add or modify graftpoints :param updated_graftpoints: Dict of commit shas to list of parent shas """ # Simple validation for commit, parents in updated_graftpoints.items(): for sha in [commit] + parents: check_hexsha(sha, 'Invalid graftpoint') self._graftpoints.update(updated_graftpoints) def _remove_graftpoints(self, to_remove=[]): """Remove graftpoints :param to_remove: List of commit shas """ for sha in to_remove: del self._graftpoints[sha] def do_commit(self, message=None, committer=None, author=None, commit_timestamp=None, commit_timezone=None, author_timestamp=None, author_timezone=None, tree=None, encoding=None, ref=b'HEAD', merge_heads=None): """Create a new commit. :param message: Commit message :param committer: Committer fullname :param author: Author fullname (defaults to committer) :param commit_timestamp: Commit timestamp (defaults to now) :param commit_timezone: Commit timestamp timezone (defaults to GMT) :param author_timestamp: Author timestamp (defaults to commit timestamp) :param author_timezone: Author timestamp timezone (defaults to commit timestamp timezone) :param tree: SHA1 of the tree root to use (if not specified the current index will be committed). :param encoding: Encoding :param ref: Optional ref to commit to (defaults to current branch) :param merge_heads: Merge heads (defaults to .git/MERGE_HEADS) :return: New commit SHA1 """ import time c = Commit() if tree is None: index = self.open_index() c.tree = index.commit(self.object_store) else: if len(tree) != 40: raise ValueError("tree must be a 40-byte hex sha string") c.tree = tree try: self.hooks['pre-commit'].execute() except HookError as e: raise CommitError(e) except KeyError: # no hook defined, silent fallthrough pass if merge_heads is None: # FIXME: Read merge heads from .git/MERGE_HEADS merge_heads = [] if committer is None: # FIXME: Support GIT_COMMITTER_NAME/GIT_COMMITTER_EMAIL environment # variables committer = self._get_user_identity() c.committer = committer if commit_timestamp is None: # FIXME: Support GIT_COMMITTER_DATE environment variable commit_timestamp = time.time() c.commit_time = int(commit_timestamp) if commit_timezone is None: # FIXME: Use current user timezone rather than UTC commit_timezone = 0 c.commit_timezone = commit_timezone if author is None: # FIXME: Support GIT_AUTHOR_NAME/GIT_AUTHOR_EMAIL environment # variables author = committer c.author = author if author_timestamp is None: # FIXME: Support GIT_AUTHOR_DATE environment variable author_timestamp = commit_timestamp c.author_time = int(author_timestamp) if author_timezone is None: author_timezone = commit_timezone c.author_timezone = author_timezone if encoding is not None: c.encoding = encoding if message is None: # FIXME: Try to read commit message from .git/MERGE_MSG raise ValueError("No commit message specified") try: c.message = self.hooks['commit-msg'].execute(message) if c.message is None: c.message = message except HookError as e: raise CommitError(e) except KeyError: # no hook defined, message not modified c.message = message if ref is None: # Create a dangling commit c.parents = merge_heads self.object_store.add_object(c) else: try: old_head = self.refs[ref] c.parents = [old_head] + merge_heads self.object_store.add_object(c) ok = self.refs.set_if_equals(ref, old_head, c.id) except KeyError: c.parents = merge_heads self.object_store.add_object(c) ok = self.refs.add_if_new(ref, c.id) if not ok: # Fail if the atomic compare-and-swap failed, leaving the # commit and all its objects as garbage. raise CommitError("%s changed during commit" % (ref,)) try: self.hooks['post-commit'].execute() except HookError as e: # silent failure warnings.warn("post-commit hook failed: %s" % e, UserWarning) except KeyError: # no hook defined, silent fallthrough pass return c.id def read_gitfile(f): """Read a ``.git`` file. The first line of the file should start with "gitdir: " :param f: File-like object to read from :return: A path """ cs = f.read() if not cs.startswith("gitdir: "): raise ValueError("Expected file to start with 'gitdir: '") return cs[len("gitdir: "):].rstrip("\n") class Repo(BaseRepo): """A git repository backed by local disk. To open an existing repository, call the contructor with the path of the repository. To create a new repository, use the Repo.init class method. """ def __init__(self, root): hidden_path = os.path.join(root, CONTROLDIR) if os.path.isdir(os.path.join(hidden_path, OBJECTDIR)): self.bare = False self._controldir = hidden_path elif (os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(os.path.join(root, REFSDIR))): self.bare = True self._controldir = root elif os.path.isfile(hidden_path): self.bare = False with open(hidden_path, 'r') as f: path = read_gitfile(f) self.bare = False self._controldir = os.path.join(root, path) else: raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=root) ) commondir = self.get_named_file(COMMONDIR) if commondir is not None: with commondir: self._commondir = os.path.join( self.controldir(), commondir.read().rstrip(b"\r\n").decode( sys.getfilesystemencoding())) else: self._commondir = self._controldir self.path = root object_store = DiskObjectStore( os.path.join(self.commondir(), OBJECTDIR)) refs = DiskRefsContainer(self.commondir(), self._controldir) BaseRepo.__init__(self, object_store, refs) self._graftpoints = {} graft_file = self.get_named_file(os.path.join("info", "grafts"), basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) graft_file = self.get_named_file("shallow", basedir=self.commondir()) if graft_file: with graft_file: self._graftpoints.update(parse_graftpoints(graft_file)) self.hooks['pre-commit'] = PreCommitShellHook(self.controldir()) self.hooks['commit-msg'] = CommitMsgShellHook(self.controldir()) self.hooks['post-commit'] = PostCommitShellHook(self.controldir()) @classmethod def discover(cls, start='.'): """Iterate parent directories to discover a repository Return a Repo object for the first parent directory that looks like a Git repository. :param start: The directory to start discovery from (defaults to '.') """ remaining = True path = os.path.abspath(start) while remaining: try: return cls(path) except NotGitRepository: path, remaining = os.path.split(path) raise NotGitRepository( "No git repository was found at %(path)s" % dict(path=start) ) def controldir(self): """Return the path of the control directory.""" return self._controldir def commondir(self): """Return the path of the common directory. For a main working tree, it is identical to controldir(). For a linked working tree, it is the control directory of the main working tree.""" return self._commondir def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ fname = os.path.join(self.path, '.probe-permissions') with open(fname, 'w') as f: f.write('') st1 = os.lstat(fname) os.chmod(fname, st1.st_mode ^ stat.S_IXUSR) st2 = os.lstat(fname) os.unlink(fname) mode_differs = st1.st_mode != st2.st_mode st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0 return mode_differs and st2_has_exec def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ path = path.lstrip(os.path.sep) with GitFile(os.path.join(self.controldir(), path), 'wb') as f: f.write(contents) def get_named_file(self, path, basedir=None): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-based Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :param basedir: Optional argument that specifies an alternative to the control dir. :return: An open file object, or None if the file does not exist. """ # TODO(dborowitz): sanitize filenames, since this is used directly by # the dumb web serving code. if basedir is None: basedir = self.controldir() path = path.lstrip(os.path.sep) try: return open(os.path.join(basedir, path), 'rb') except (IOError, OSError) as e: if e.errno == errno.ENOENT: return None raise def index_path(self): """Return path to the index file.""" return os.path.join(self.controldir(), INDEX_FILENAME) def open_index(self): """Open the index for this repository. :raise NoIndexPresent: If no index is present :return: The matching `Index` """ from dulwich.index import Index if not self.has_index(): raise NoIndexPresent() return Index(self.index_path()) def has_index(self): """Check if an index is present.""" # Bare repos must never have index files; non-bare repos may have a # missing index file, which is treated as empty. return not self.bare def stage(self, fs_paths): """Stage a set of paths. :param fs_paths: List of paths, relative to the repository path """ root_path_bytes = self.path.encode(sys.getfilesystemencoding()) if not isinstance(fs_paths, list): fs_paths = [fs_paths] from dulwich.index import ( blob_from_path_and_stat, index_entry_from_stat, _fs_to_tree_path, ) index = self.open_index() for fs_path in fs_paths: if not isinstance(fs_path, bytes): fs_path = fs_path.encode(sys.getfilesystemencoding()) if os.path.isabs(fs_path): raise ValueError( "path %r should be relative to " "repository root, not absolute" % fs_path) tree_path = _fs_to_tree_path(fs_path) full_path = os.path.join(root_path_bytes, fs_path) try: st = os.lstat(full_path) except OSError: # File no longer exists try: del index[tree_path] except KeyError: pass # already removed else: if not stat.S_ISDIR(st.st_mode): blob = blob_from_path_and_stat(full_path, st) self.object_store.add_object(blob) index[tree_path] = index_entry_from_stat(st, blob.id, 0) else: try: del index[tree_path] except KeyError: pass index.write() def clone(self, target_path, mkdir=True, bare=False, origin=b"origin"): """Clone this repository. :param target_path: Target path :param mkdir: Create the target directory :param bare: Whether to create a bare repository :param origin: Base name for refs in target repository cloned from this repository :return: Created repository as `Repo` """ if not bare: target = self.init(target_path, mkdir=mkdir) else: target = self.init_bare(target_path, mkdir=mkdir) self.fetch(target) target.refs.import_refs( b'refs/remotes/' + origin, self.refs.as_dict(b'refs/heads')) target.refs.import_refs( b'refs/tags', self.refs.as_dict(b'refs/tags')) try: target.refs.add_if_new(DEFAULT_REF, self.refs[DEFAULT_REF]) except KeyError: pass target_config = target.get_config() encoded_path = self.path if not isinstance(encoded_path, bytes): encoded_path = encoded_path.encode(sys.getfilesystemencoding()) target_config.set((b'remote', b'origin'), b'url', encoded_path) target_config.set((b'remote', b'origin'), b'fetch', b'+refs/heads/*:refs/remotes/origin/*') target_config.write_to_path() # Update target head head_chain, head_sha = self.refs.follow(b'HEAD') if head_chain and head_sha is not None: target.refs.set_symbolic_ref(b'HEAD', head_chain[-1]) target[b'HEAD'] = head_sha if not bare: # Checkout HEAD to target dir target.reset_index() return target def reset_index(self, tree=None): """Reset the index back to a specific tree. :param tree: Tree SHA to reset to, None for current HEAD tree. """ from dulwich.index import ( build_index_from_tree, validate_path_element_default, validate_path_element_ntfs, ) if tree is None: tree = self[b'HEAD'].tree config = self.get_config() honor_filemode = config.get_boolean( - 'core', 'filemode', os.name != "nt") - if config.get_boolean('core', 'core.protectNTFS', os.name == "nt"): + b'core', b'filemode', os.name != "nt") + if config.get_boolean(b'core', b'core.protectNTFS', os.name == "nt"): validate_path_element = validate_path_element_ntfs else: validate_path_element = validate_path_element_default return build_index_from_tree( self.path, self.index_path(), self.object_store, tree, honor_filemode=honor_filemode, validate_path_element=validate_path_element) def get_config(self): """Retrieve the config object. :return: `ConfigFile` object for the ``.git/config`` file. """ from dulwich.config import ConfigFile path = os.path.join(self._controldir, 'config') try: return ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise ret = ConfigFile() ret.path = path return ret def get_description(self): """Retrieve the description of this repository. :return: A string describing the repository or None. """ path = os.path.join(self._controldir, 'description') try: with GitFile(path, 'rb') as f: return f.read() except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise return None def __repr__(self): return "" % self.path def set_description(self, description): """Set the description for this repository. :param description: Text to set as description for this repository. """ self._put_named_file('description', description) @classmethod def _init_maybe_bare(cls, path, bare): for d in BASE_DIRECTORIES: os.mkdir(os.path.join(path, *d)) DiskObjectStore.init(os.path.join(path, OBJECTDIR)) ret = cls(path) ret.refs.set_symbolic_ref(b'HEAD', DEFAULT_REF) ret._init_files(bare) return ret @classmethod def init(cls, path, mkdir=False): """Create a new repository. :param path: Path in which to create the repository :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) controldir = os.path.join(path, CONTROLDIR) os.mkdir(controldir) cls._init_maybe_bare(controldir, False) return cls(path) @classmethod def _init_new_working_directory(cls, path, main_repo, identifier=None, mkdir=False): """Create a new working directory linked to a repository. :param path: Path in which to create the working tree. :param main_repo: Main repository to reference :param identifier: Worktree identifier :param mkdir: Whether to create the directory :return: `Repo` instance """ if mkdir: os.mkdir(path) if identifier is None: identifier = os.path.basename(path) main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES) worktree_controldir = os.path.join(main_worktreesdir, identifier) gitdirfile = os.path.join(path, CONTROLDIR) with open(gitdirfile, 'wb') as f: f.write(b'gitdir: ' + worktree_controldir.encode(sys.getfilesystemencoding()) + b'\n') try: os.mkdir(main_worktreesdir) except OSError as e: if e.errno != errno.EEXIST: raise try: os.mkdir(worktree_controldir) except OSError as e: if e.errno != errno.EEXIST: raise with open(os.path.join(worktree_controldir, GITDIR), 'wb') as f: f.write(gitdirfile.encode(sys.getfilesystemencoding()) + b'\n') with open(os.path.join(worktree_controldir, COMMONDIR), 'wb') as f: f.write(b'../..\n') with open(os.path.join(worktree_controldir, 'HEAD'), 'wb') as f: f.write(main_repo.head() + b'\n') r = cls(path) r.reset_index() return r @classmethod def init_bare(cls, path, mkdir=False): """Create a new bare repository. ``path`` should already exist and be an empty directory. :param path: Path to create bare repository in :return: a `Repo` instance """ if mkdir: os.mkdir(path) return cls._init_maybe_bare(path, True) create = init_bare def close(self): """Close any files opened by this repository.""" self.object_store.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class MemoryRepo(BaseRepo): """Repo that stores refs, objects, and named files in memory. MemoryRepos are always bare: they have no working tree and no index, since those have a stronger dependency on the filesystem. """ def __init__(self): from dulwich.config import ConfigFile BaseRepo.__init__(self, MemoryObjectStore(), DictRefsContainer({})) self._named_files = {} self.bare = True self._config = ConfigFile() self._description = None def set_description(self, description): self._description = description def get_description(self): return self._description def _determine_file_mode(self): """Probe the file-system to determine whether permissions can be trusted. :return: True if permissions can be trusted, False otherwise. """ return sys.platform != 'win32' def _put_named_file(self, path, contents): """Write a file to the control dir with the given name and contents. :param path: The path to the file, relative to the control dir. :param contents: A string to write to the file. """ self._named_files[path] = contents def get_named_file(self, path): """Get a file from the control dir with a specific name. Although the filename should be interpreted as a filename relative to the control dir in a disk-baked Repo, the object returned need not be pointing to a file in that location. :param path: The path to the file, relative to the control dir. :return: An open file object, or None if the file does not exist. """ contents = self._named_files.get(path, None) if contents is None: return None return BytesIO(contents) def open_index(self): """Fail to open index for this repo, since it is bare. :raise NoIndexPresent: Raised when no index is present """ raise NoIndexPresent() def get_config(self): """Retrieve the config object. :return: `ConfigFile` object. """ return self._config @classmethod def init_bare(cls, objects, refs): """Create a new bare repository in memory. :param objects: Objects for the new repository, as iterable :param refs: Refs as dictionary, mapping names to object SHA1s """ ret = cls() for obj in objects: ret.object_store.add_object(obj) for refname, sha in refs.items(): ret.refs[refname] = sha ret._init_files(bare=True) return ret