diff --git a/dulwich/config.py b/dulwich/config.py index bfd628eb..08bcc7ee 100644 --- a/dulwich/config.py +++ b/dulwich/config.py @@ -1,471 +1,540 @@ # config.py - Reading and writing Git config files # Copyright (C) 2011-2013 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Reading and writing Git configuration files. TODO: * preserve formatting when updating configuration files * treat subsection names as case-insensitive for [branch.foo] style subsections """ import errno import os from collections import ( + Iterable, OrderedDict, MutableMapping, ) from dulwich.file import GitFile DEFAULT_ENCODING = 'utf-8' +SENTINAL = object() + + +def lower_key(key): + if isinstance(key, (bytes, str)): + return key.lower() + + if isinstance(key, Iterable): + return type(key)( + map(lower_key, key) + ) + + return key + + +class CaseInsensitiveDict(OrderedDict): + @classmethod + def make(cls, dict_in=None): + + if isinstance(dict_in, cls): + return dict_in + + out = cls() + + if dict_in is None: + return out + + if not isinstance(dict_in, MutableMapping): + raise TypeError + + for key, value in dict_in.items(): + out[key] = value + + return out + + def __setitem__(self, key, value, **kwargs): + key = lower_key(key) + + super(CaseInsensitiveDict, self).__setitem__(key, value, **kwargs) + + def __getitem__(self, item): + key = lower_key(item) + + return super(CaseInsensitiveDict, self).__getitem__(key) + + def get(self, key, default=SENTINAL): + try: + return self[key] + except KeyError: + pass + + if default is SENTINAL: + return type(self)() + + return default + + def setdefault(self, key, default=SENTINAL): + try: + return self[key] + except KeyError: + self[key] = self.get(key, default) + + return self[key] class Config(object): """A Git configuration.""" def get(self, section, name): """Retrieve the contents of a configuration setting. :param section: Tuple with section name and optional subsection namee :param subsection: Subsection name :return: Contents of the setting :raise KeyError: if the value is not set """ raise NotImplementedError(self.get) def get_boolean(self, section, name, default=None): """Retrieve a configuration setting as boolean. :param section: Tuple with section name and optional subsection namee :param name: Name of the setting, including section and possible subsection. :return: Contents of the setting :raise KeyError: if the value is not set """ try: value = self.get(section, name) except KeyError: return default if value.lower() == b"true": return True elif value.lower() == b"false": return False raise ValueError("not a valid boolean string: %r" % value) def set(self, section, name, value): """Set a configuration value. :param section: Tuple with section name and optional subsection namee :param name: Name of the configuration value, including section and optional subsection :param: Value of the setting """ raise NotImplementedError(self.set) def iteritems(self, section): """Iterate over the configuration pairs for a specific section. :param section: Tuple with section name and optional subsection namee :return: Iterator over (name, value) pairs """ raise NotImplementedError(self.iteritems) def itersections(self): """Iterate over the sections. :return: Iterator over section tuples """ raise NotImplementedError(self.itersections) def has_section(self, name): """Check if a specified section exists. :param name: Name of section to check for :return: boolean indicating whether the section exists """ return (name in self.itersections()) class ConfigDict(Config, MutableMapping): """Git configuration stored in a dictionary.""" def __init__(self, values=None): """Create a new ConfigDict.""" - if values is None: - values = OrderedDict() - self._values = values + self._values = CaseInsensitiveDict.make(values) def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self._values) def __eq__(self, other): return ( isinstance(other, self.__class__) and other._values == self._values) def __getitem__(self, key): return self._values.__getitem__(key) def __setitem__(self, key, value): return self._values.__setitem__(key, value) def __delitem__(self, key): return self._values.__delitem__(key) def __iter__(self): return self._values.__iter__() def __len__(self): return self._values.__len__() @classmethod def _parse_setting(cls, name): parts = name.split(".") if len(parts) == 3: return (parts[0], parts[1], parts[2]) else: return (parts[0], None, parts[1]) - def get(self, section, name): + @staticmethod + def check_section_and_name(section, name): if not isinstance(section, tuple): section = (section, ) if not all([isinstance(subsection, bytes) for subsection in section]): raise TypeError(section) if not isinstance(name, bytes): raise TypeError(name) + + return section + + def get(self, section, name): + section = self.check_section_and_name(section, name) + if len(section) > 1: try: return self._values[section][name] except KeyError: pass + return self._values[(section[0],)][name] def set(self, section, name, value): - if not isinstance(section, tuple): - section = (section, ) - if not isinstance(name, bytes): - raise TypeError(name) + section = self.check_section_and_name(section, name) + if type(value) not in (bool, bytes): raise TypeError(value) - self._values.setdefault(section, OrderedDict())[name] = value + + self._values.setdefault(section)[name] = value def iteritems(self, section): - return self._values.get(section, OrderedDict()).items() + return self._values.get(section).items() def itersections(self): return self._values.keys() def _format_string(value): if (value.startswith(b" ") or value.startswith(b"\t") or value.endswith(b" ") or b'#' in value or value.endswith(b"\t")): return b'"' + _escape_value(value) + b'"' else: return _escape_value(value) _ESCAPE_TABLE = { ord(b"\\"): ord(b"\\"), ord(b"\""): ord(b"\""), ord(b"n"): ord(b"\n"), ord(b"t"): ord(b"\t"), ord(b"b"): ord(b"\b"), } _COMMENT_CHARS = [ord(b"#"), ord(b";")] _WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")] def _parse_string(value): value = bytearray(value.strip()) ret = bytearray() whitespace = bytearray() in_quotes = False i = 0 while i < len(value): c = value[i] if c == ord(b"\\"): i += 1 try: v = _ESCAPE_TABLE[value[i]] except IndexError: raise ValueError( "escape character in %r at %d before end of string" % (value, i)) except KeyError: raise ValueError( "escape character followed by unknown character " "%s at %d in %r" % (value[i], i, value)) if whitespace: ret.extend(whitespace) whitespace = bytearray() ret.append(v) elif c == ord(b"\""): in_quotes = (not in_quotes) elif c in _COMMENT_CHARS and not in_quotes: # the rest of the line is a comment break elif c in _WHITESPACE_CHARS: whitespace.append(c) else: if whitespace: ret.extend(whitespace) whitespace = bytearray() ret.append(c) i += 1 if in_quotes: raise ValueError("missing end quote") return bytes(ret) def _escape_value(value): """Escape a value.""" value = value.replace(b"\\", b"\\\\") value = value.replace(b"\n", b"\\n") value = value.replace(b"\t", b"\\t") value = value.replace(b"\"", b"\\\"") return value def _check_variable_name(name): for i in range(len(name)): c = name[i:i+1] if not c.isalnum() and c != b'-': return False return True def _check_section_name(name): for i in range(len(name)): c = name[i:i+1] if not c.isalnum() and c not in (b'-', b'.'): return False return True def _strip_comments(line): comment_bytes = {ord(b"#"), ord(b";")} quote = ord(b'"') string_open = False # Normalize line to bytearray for simple 2/3 compatibility for i, character in enumerate(bytearray(line)): # Comment characters outside balanced quotes denote comment start if character == quote: string_open = not string_open elif not string_open and character in comment_bytes: return line[:i] return line class ConfigFile(ConfigDict): """A Git configuration file, like .git/config or ~/.gitconfig. """ @classmethod def from_file(cls, f): """Read configuration from a file-like object.""" ret = cls() section = None setting = None for lineno, line in enumerate(f.readlines()): line = line.lstrip() if setting is None: # Parse section header ("[bla]") if len(line) > 0 and line[:1] == b"[": line = _strip_comments(line).rstrip() try: last = line.index(b"]") except ValueError: raise ValueError("expected trailing ]") pts = line[1:last].split(b" ", 1) line = line[last+1:] pts[0] = pts[0].lower() if len(pts) == 2: if pts[1][:1] != b"\"" or pts[1][-1:] != b"\"": raise ValueError( "Invalid subsection %r" % pts[1]) else: pts[1] = pts[1][1:-1] if not _check_section_name(pts[0]): raise ValueError("invalid section name %r" % pts[0]) section = (pts[0], pts[1]) else: if not _check_section_name(pts[0]): raise ValueError( "invalid section name %r" % pts[0]) pts = pts[0].split(b".", 1) if len(pts) == 2: section = (pts[0], pts[1]) else: section = (pts[0], ) - ret._values[section] = OrderedDict() + ret._values.setdefault(section) if _strip_comments(line).strip() == b"": continue if section is None: raise ValueError("setting %r without section" % line) try: setting, value = line.split(b"=", 1) except ValueError: setting = line value = b"true" setting = setting.strip().lower() if not _check_variable_name(setting): raise ValueError("invalid variable name %s" % setting) if value.endswith(b"\\\n"): continuation = value[:-2] else: continuation = None value = _parse_string(value) ret._values[section][setting] = value setting = None else: # continuation line if line.endswith(b"\\\n"): continuation += line[:-2] else: continuation += line value = _parse_string(continuation) ret._values[section][setting] = value continuation = None setting = None return ret @classmethod def from_path(cls, path): """Read configuration from a file on disk.""" with GitFile(path, 'rb') as f: ret = cls.from_file(f) ret.path = path return ret def write_to_path(self, path=None): """Write configuration to a file on disk.""" if path is None: path = self.path with GitFile(path, 'wb') as f: self.write_to_file(f) def write_to_file(self, f): """Write configuration to a file-like object.""" for section, values in self._values.items(): try: section_name, subsection_name = section except ValueError: (section_name, ) = section subsection_name = None if subsection_name is None: f.write(b"[" + section_name + b"]\n") else: f.write(b"[" + section_name + b" \"" + subsection_name + b"\"]\n") for key, value in values.items(): if value is True: value = b"true" elif value is False: value = b"false" else: value = _format_string(value) f.write(b"\t" + key + b" = " + value + b"\n") class StackedConfig(Config): """Configuration which reads from multiple config files..""" def __init__(self, backends, writable=None): self.backends = backends self.writable = writable def __repr__(self): return "<%s for %r>" % (self.__class__.__name__, self.backends) @classmethod def default(cls): return cls(cls.default_backends()) @classmethod def default_backends(cls): """Retrieve the default configuration. See git-config(1) for details on the files searched. """ paths = [] paths.append(os.path.expanduser("~/.gitconfig")) xdg_config_home = os.environ.get( "XDG_CONFIG_HOME", os.path.expanduser("~/.config/"), ) paths.append(os.path.join(xdg_config_home, "git", "config")) if "GIT_CONFIG_NOSYSTEM" not in os.environ: paths.append("/etc/gitconfig") backends = [] for path in paths: try: cf = ConfigFile.from_path(path) except (IOError, OSError) as e: if e.errno != errno.ENOENT: raise else: continue backends.append(cf) return backends def get(self, section, name): if not isinstance(section, tuple): section = (section, ) if not all([isinstance(subsection, bytes) for subsection in section]): raise TypeError(section) if not isinstance(name, bytes): raise TypeError(name) for backend in self.backends: try: return backend.get(section, name) except KeyError: pass raise KeyError(name) def set(self, section, name, value): if self.writable is None: raise NotImplementedError(self.set) return self.writable.set(section, name, value) def parse_submodules(config): """Parse a gitmodules GitConfig file, returning submodules. :param config: A `ConfigFile` :return: list of tuples (submodule path, url, name), where name is quoted part of the section's name. """ for section in config.keys(): section_kind, section_name = section if section_kind == b'submodule': sm_path = config.get(section, b'path') sm_url = config.get(section, b'url') yield (sm_path, sm_url, section_name) diff --git a/dulwich/tests/test_config.py b/dulwich/tests/test_config.py index f064a130..a4b58f5c 100644 --- a/dulwich/tests/test_config.py +++ b/dulwich/tests/test_config.py @@ -1,338 +1,343 @@ # test_config.py -- Tests for reading and writing configuration files # Copyright (C) 2011 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for reading and writing configuration files.""" from io import BytesIO from dulwich.config import ( ConfigDict, ConfigFile, StackedConfig, _check_section_name, _check_variable_name, _format_string, _escape_value, _parse_string, parse_submodules, ) from dulwich.tests import ( TestCase, ) class ConfigFileTests(TestCase): def from_file(self, text): return ConfigFile.from_file(BytesIO(text)) def test_empty(self): ConfigFile() def test_eq(self): self.assertEqual(ConfigFile(), ConfigFile()) def test_default_config(self): cf = self.from_file(b"""[core] \trepositoryformatversion = 0 \tfilemode = true \tbare = false \tlogallrefupdates = true """) self.assertEqual(ConfigFile({(b"core", ): { b"repositoryformatversion": b"0", b"filemode": b"true", b"bare": b"false", b"logallrefupdates": b"true"}}), cf) def test_from_file_empty(self): cf = self.from_file(b"") self.assertEqual(ConfigFile(), cf) def test_empty_line_before_section(self): cf = self.from_file(b"\n[section]\n") self.assertEqual(ConfigFile({(b"section", ): {}}), cf) def test_comment_before_section(self): cf = self.from_file(b"# foo\n[section]\n") self.assertEqual(ConfigFile({(b"section", ): {}}), cf) def test_comment_after_section(self): cf = self.from_file(b"[section] # foo\n") self.assertEqual(ConfigFile({(b"section", ): {}}), cf) def test_comment_after_variable(self): cf = self.from_file(b"[section]\nbar= foo # a comment\n") self.assertEqual(ConfigFile({(b"section", ): {b"bar": b"foo"}}), cf) def test_comment_character_within_value_string(self): cf = self.from_file(b"[section]\nbar= \"foo#bar\"\n") self.assertEqual( ConfigFile({(b"section", ): {b"bar": b"foo#bar"}}), cf) def test_comment_character_within_section_string(self): cf = self.from_file(b"[branch \"foo#bar\"] # a comment\nbar= foo\n") self.assertEqual( ConfigFile({(b"branch", b"foo#bar"): {b"bar": b"foo"}}), cf) def test_from_file_section(self): cf = self.from_file(b"[core]\nfoo = bar\n") self.assertEqual(b"bar", cf.get((b"core", ), b"foo")) self.assertEqual(b"bar", cf.get((b"core", b"foo"), b"foo")) - def test_from_file_section_case_insensitive(self): + def test_from_file_section_case_insensitive_lower(self): cf = self.from_file(b"[cOre]\nfOo = bar\n") self.assertEqual(b"bar", cf.get((b"core", ), b"foo")) self.assertEqual(b"bar", cf.get((b"core", b"foo"), b"foo")) + def test_from_file_section_case_insensitive_mixed(self): + cf = self.from_file(b"[cOre]\nfOo = bar\n") + self.assertEqual(b"bar", cf.get((b"core", ), b"fOo")) + self.assertEqual(b"bar", cf.get((b"cOre", b"fOo"), b"fOo")) + def test_from_file_with_mixed_quoted(self): cf = self.from_file(b"[core]\nfoo = \"bar\"la\n") self.assertEqual(b"barla", cf.get((b"core", ), b"foo")) def test_from_file_section_with_open_brackets(self): self.assertRaises(ValueError, self.from_file, b"[core\nfoo = bar\n") def test_from_file_value_with_open_quoted(self): self.assertRaises(ValueError, self.from_file, b"[core]\nfoo = \"bar\n") def test_from_file_with_quotes(self): cf = self.from_file( b"[core]\n" b'foo = " bar"\n') self.assertEqual(b" bar", cf.get((b"core", ), b"foo")) def test_from_file_with_interrupted_line(self): cf = self.from_file( b"[core]\n" b'foo = bar\\\n' b' la\n') self.assertEqual(b"barla", cf.get((b"core", ), b"foo")) def test_from_file_with_boolean_setting(self): cf = self.from_file( b"[core]\n" b'foo\n') self.assertEqual(b"true", cf.get((b"core", ), b"foo")) def test_from_file_subsection(self): cf = self.from_file(b"[branch \"foo\"]\nfoo = bar\n") self.assertEqual(b"bar", cf.get((b"branch", b"foo"), b"foo")) def test_from_file_subsection_invalid(self): self.assertRaises( ValueError, self.from_file, b"[branch \"foo]\nfoo = bar\n") def test_from_file_subsection_not_quoted(self): cf = self.from_file(b"[branch.foo]\nfoo = bar\n") self.assertEqual(b"bar", cf.get((b"branch", b"foo"), b"foo")) def test_write_to_file_empty(self): c = ConfigFile() f = BytesIO() c.write_to_file(f) self.assertEqual(b"", f.getvalue()) def test_write_to_file_section(self): c = ConfigFile() c.set((b"core", ), b"foo", b"bar") f = BytesIO() c.write_to_file(f) self.assertEqual(b"[core]\n\tfoo = bar\n", f.getvalue()) def test_write_to_file_subsection(self): c = ConfigFile() c.set((b"branch", b"blie"), b"foo", b"bar") f = BytesIO() c.write_to_file(f) self.assertEqual(b"[branch \"blie\"]\n\tfoo = bar\n", f.getvalue()) def test_same_line(self): cf = self.from_file(b"[branch.foo] foo = bar\n") self.assertEqual(b"bar", cf.get((b"branch", b"foo"), b"foo")) def test_quoted(self): cf = self.from_file(b"""[gui] \tfontdiff = -family \\\"Ubuntu Mono\\\" -size 11 -overstrike 0 """) self.assertEqual(ConfigFile({(b'gui', ): { b'fontdiff': b'-family "Ubuntu Mono" -size 11 -overstrike 0', }}), cf) def test_quoted_multiline(self): cf = self.from_file(b"""[alias] who = \"!who() {\\ git log --no-merges --pretty=format:'%an - %ae' $@ | uniq -c | sort -rn;\\ };\\ who\" """) self.assertEqual(ConfigFile({(b'alias', ): { b'who': (b"!who() {git log --no-merges --pretty=format:'%an - " b"%ae' $@ | uniq -c | sort -rn;};who") }}), cf) def test_set_hash_gets_quoted(self): c = ConfigFile() c.set(b"xandikos", b"color", b"#665544") f = BytesIO() c.write_to_file(f) self.assertEqual(b"[xandikos]\n\tcolor = \"#665544\"\n", f.getvalue()) class ConfigDictTests(TestCase): def test_get_set(self): cd = ConfigDict() self.assertRaises(KeyError, cd.get, b"foo", b"core") cd.set((b"core", ), b"foo", b"bla") self.assertEqual(b"bla", cd.get((b"core", ), b"foo")) cd.set((b"core", ), b"foo", b"bloe") self.assertEqual(b"bloe", cd.get((b"core", ), b"foo")) def test_get_boolean(self): cd = ConfigDict() cd.set((b"core", ), b"foo", b"true") self.assertTrue(cd.get_boolean((b"core", ), b"foo")) cd.set((b"core", ), b"foo", b"false") self.assertFalse(cd.get_boolean((b"core", ), b"foo")) cd.set((b"core", ), b"foo", b"invalid") self.assertRaises(ValueError, cd.get_boolean, (b"core", ), b"foo") def test_dict(self): cd = ConfigDict() cd.set((b"core", ), b"foo", b"bla") cd.set((b"core2", ), b"foo", b"bloe") self.assertEqual([(b"core", ), (b"core2", )], list(cd.keys())) self.assertEqual(cd[(b"core", )], {b'foo': b'bla'}) cd[b'a'] = b'b' self.assertEqual(cd[b'a'], b'b') def test_iteritems(self): cd = ConfigDict() cd.set((b"core", ), b"foo", b"bla") cd.set((b"core2", ), b"foo", b"bloe") self.assertEqual( [(b'foo', b'bla')], list(cd.iteritems((b"core", )))) def test_iteritems_nonexistant(self): cd = ConfigDict() cd.set((b"core2", ), b"foo", b"bloe") self.assertEqual([], list(cd.iteritems((b"core", )))) def test_itersections(self): cd = ConfigDict() cd.set((b"core2", ), b"foo", b"bloe") self.assertEqual([(b"core2", )], list(cd.itersections())) class StackedConfigTests(TestCase): def test_default_backends(self): StackedConfig.default_backends() class EscapeValueTests(TestCase): def test_nothing(self): self.assertEqual(b"foo", _escape_value(b"foo")) def test_backslash(self): self.assertEqual(b"foo\\\\", _escape_value(b"foo\\")) def test_newline(self): self.assertEqual(b"foo\\n", _escape_value(b"foo\n")) class FormatStringTests(TestCase): def test_quoted(self): self.assertEqual(b'" foo"', _format_string(b" foo")) self.assertEqual(b'"\\tfoo"', _format_string(b"\tfoo")) def test_not_quoted(self): self.assertEqual(b'foo', _format_string(b"foo")) self.assertEqual(b'foo bar', _format_string(b"foo bar")) class ParseStringTests(TestCase): def test_quoted(self): self.assertEqual(b' foo', _parse_string(b'" foo"')) self.assertEqual(b'\tfoo', _parse_string(b'"\\tfoo"')) def test_not_quoted(self): self.assertEqual(b'foo', _parse_string(b"foo")) self.assertEqual(b'foo bar', _parse_string(b"foo bar")) def test_nothing(self): self.assertEqual(b"", _parse_string(b'')) def test_tab(self): self.assertEqual(b"\tbar\t", _parse_string(b"\\tbar\\t")) def test_newline(self): self.assertEqual(b"\nbar\t", _parse_string(b"\\nbar\\t\t")) def test_quote(self): self.assertEqual(b"\"foo\"", _parse_string(b"\\\"foo\\\"")) class CheckVariableNameTests(TestCase): def test_invalid(self): self.assertFalse(_check_variable_name(b"foo ")) self.assertFalse(_check_variable_name(b"bar,bar")) self.assertFalse(_check_variable_name(b"bar.bar")) def test_valid(self): self.assertTrue(_check_variable_name(b"FOO")) self.assertTrue(_check_variable_name(b"foo")) self.assertTrue(_check_variable_name(b"foo-bar")) class CheckSectionNameTests(TestCase): def test_invalid(self): self.assertFalse(_check_section_name(b"foo ")) self.assertFalse(_check_section_name(b"bar,bar")) def test_valid(self): self.assertTrue(_check_section_name(b"FOO")) self.assertTrue(_check_section_name(b"foo")) self.assertTrue(_check_section_name(b"foo-bar")) self.assertTrue(_check_section_name(b"bar.bar")) class SubmodulesTests(TestCase): def testSubmodules(self): cf = ConfigFile.from_file(BytesIO(b"""\ [submodule "core/lib"] \tpath = core/lib \turl = https://github.com/phhusson/QuasselC.git """)) got = list(parse_submodules(cf)) self.assertEqual([ (b'core/lib', b'https://github.com/phhusson/QuasselC.git', b'core/lib')], got)