diff --git a/dulwich/objectspec.py b/dulwich/objectspec.py index f588d313..6cd537c2 100644 --- a/dulwich/objectspec.py +++ b/dulwich/objectspec.py @@ -1,234 +1,235 @@ # objectspec.py -- Object specification # Copyright (C) 2014 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Object specification.""" def to_bytes(text): if getattr(text, "encode", None) is not None: text = text.encode('ascii') return text def parse_object(repo, objectish): """Parse a string referring to an object. Args: repo: A `Repo` object objectish: A string referring to an object Returns: A git object Raises: KeyError: If the object can not be found """ objectish = to_bytes(objectish) return repo[objectish] def parse_tree(repo, treeish): """Parse a string referring to a tree. Args: repo: A `Repo` object treeish: A string referring to a tree Returns: A git object Raises: KeyError: If the object can not be found """ treeish = to_bytes(treeish) o = repo[treeish] if o.type_name == b"commit": return repo[o.tree] return o def parse_ref(container, refspec): """Parse a string referring to a reference. Args: container: A RefsContainer object refspec: A string referring to a ref Returns: A ref Raises: KeyError: If the ref can not be found """ refspec = to_bytes(refspec) possible_refs = [ refspec, b"refs/" + refspec, b"refs/tags/" + refspec, b"refs/heads/" + refspec, b"refs/remotes/" + refspec, b"refs/remotes/" + refspec + b"/HEAD" ] for ref in possible_refs: if ref in container: return ref raise KeyError(refspec) -def parse_reftuple(lh_container, rh_container, refspec): +def parse_reftuple(lh_container, rh_container, refspec, force=False): """Parse a reftuple spec. Args: lh_container: A RefsContainer object hh_container: A RefsContainer object refspec: A string Returns: A tuple with left and right ref Raises: KeyError: If one of the refs can not be found """ refspec = to_bytes(refspec) if refspec.startswith(b"+"): force = True refspec = refspec[1:] - else: - force = False if b":" in refspec: (lh, rh) = refspec.split(b":") else: lh = rh = refspec if lh == b"": lh = None else: lh = parse_ref(lh_container, lh) if rh == b"": rh = None else: try: rh = parse_ref(rh_container, rh) except KeyError: # TODO: check force? if b"/" not in rh: rh = b"refs/heads/" + rh return (lh, rh, force) -def parse_reftuples(lh_container, rh_container, refspecs): +def parse_reftuples( + lh_container, rh_container, refspecs, force=False): """Parse a list of reftuple specs to a list of reftuples. Args: lh_container: A RefsContainer object hh_container: A RefsContainer object refspecs: A list of refspecs or a string + force: Force overwriting for all reftuples Returns: A list of refs Raises: KeyError: If one of the refs can not be found """ if not isinstance(refspecs, list): refspecs = [refspecs] ret = [] # TODO: Support * in refspecs for refspec in refspecs: - ret.append(parse_reftuple(lh_container, rh_container, refspec)) + ret.append(parse_reftuple( + lh_container, rh_container, refspec, force=force)) return ret def parse_refs(container, refspecs): """Parse a list of refspecs to a list of refs. Args: container: A RefsContainer object refspecs: A list of refspecs or a string Returns: A list of refs Raises: KeyError: If one of the refs can not be found """ # TODO: Support * in refspecs if not isinstance(refspecs, list): refspecs = [refspecs] ret = [] for refspec in refspecs: ret.append(parse_ref(container, refspec)) return ret def parse_commit_range(repo, committishs): """Parse a string referring to a range of commits. Args: repo: A `Repo` object committishs: A string referring to a range of commits. Returns: An iterator over `Commit` objects Raises: KeyError: When the reference commits can not be found ValueError: If the range can not be parsed """ committishs = to_bytes(committishs) # TODO(jelmer): Support more than a single commit.. return iter([parse_commit(repo, committishs)]) class AmbiguousShortId(Exception): """The short id is ambiguous.""" def __init__(self, prefix, options): self.prefix = prefix self.options = options def scan_for_short_id(object_store, prefix): """Scan an object store for a short id.""" # TODO(jelmer): This could short-circuit looking for objects # starting with a certain prefix. ret = [] for object_id in object_store: if object_id.startswith(prefix): ret.append(object_store[object_id]) if not ret: raise KeyError(prefix) if len(ret) == 1: return ret[0] raise AmbiguousShortId(prefix, ret) def parse_commit(repo, committish): """Parse a string referring to a single commit. Args: repo: A` Repo` object commitish: A string referring to a single commit. Returns: A Commit object Raises: KeyError: When the reference commits can not be found ValueError: If the range can not be parsed """ committish = to_bytes(committish) try: return repo[committish] except KeyError: pass try: return repo[parse_ref(repo, committish)] except KeyError: pass if len(committish) >= 4 and len(committish) < 40: try: int(committish, 16) except ValueError: pass else: try: return scan_for_short_id(repo.object_store, committish) except KeyError: pass raise KeyError(committish) # TODO: parse_path_in_tree(), which handles e.g. v1.0:Documentation diff --git a/dulwich/tests/test_objectspec.py b/dulwich/tests/test_objectspec.py index 1ddb5789..37a20f76 100644 --- a/dulwich/tests/test_objectspec.py +++ b/dulwich/tests/test_objectspec.py @@ -1,233 +1,238 @@ # test_objectspec.py -- tests for objectspec.py # Copyright (C) 2014 Jelmer Vernooij # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for revision spec parsing.""" # TODO: Round-trip parse-serialize-parse and serialize-parse-serialize tests. from dulwich.objects import ( Blob, ) from dulwich.objectspec import ( parse_object, parse_commit, parse_commit_range, parse_ref, parse_refs, parse_reftuple, parse_reftuples, parse_tree, ) from dulwich.repo import MemoryRepo from dulwich.tests import ( TestCase, ) from dulwich.tests.utils import ( build_commit_graph, ) class ParseObjectTests(TestCase): """Test parse_object.""" def test_nonexistent(self): r = MemoryRepo() self.assertRaises(KeyError, parse_object, r, "thisdoesnotexist") def test_blob_by_sha(self): r = MemoryRepo() b = Blob.from_string(b"Blah") r.object_store.add_object(b) self.assertEqual(b, parse_object(r, b.id)) class ParseCommitRangeTests(TestCase): """Test parse_commit_range.""" def test_nonexistent(self): r = MemoryRepo() self.assertRaises(KeyError, parse_commit_range, r, "thisdoesnotexist") def test_commit_by_sha(self): r = MemoryRepo() c1, c2, c3 = build_commit_graph( r.object_store, [[1], [2, 1], [3, 1, 2]]) self.assertEqual([c1], list(parse_commit_range(r, c1.id))) class ParseCommitTests(TestCase): """Test parse_commit.""" def test_nonexistent(self): r = MemoryRepo() self.assertRaises(KeyError, parse_commit, r, "thisdoesnotexist") def test_commit_by_sha(self): r = MemoryRepo() [c1] = build_commit_graph(r.object_store, [[1]]) self.assertEqual(c1, parse_commit(r, c1.id)) def test_commit_by_short_sha(self): r = MemoryRepo() [c1] = build_commit_graph(r.object_store, [[1]]) self.assertEqual(c1, parse_commit(r, c1.id[:10])) class ParseRefTests(TestCase): def test_nonexistent(self): r = {} self.assertRaises(KeyError, parse_ref, r, b"thisdoesnotexist") def test_ambiguous_ref(self): r = {b"ambig1": 'bla', b"refs/ambig1": 'bla', b"refs/tags/ambig1": 'bla', b"refs/heads/ambig1": 'bla', b"refs/remotes/ambig1": 'bla', b"refs/remotes/ambig1/HEAD": "bla"} self.assertEqual(b"ambig1", parse_ref(r, b"ambig1")) def test_ambiguous_ref2(self): r = {b"refs/ambig2": 'bla', b"refs/tags/ambig2": 'bla', b"refs/heads/ambig2": 'bla', b"refs/remotes/ambig2": 'bla', b"refs/remotes/ambig2/HEAD": "bla"} self.assertEqual(b"refs/ambig2", parse_ref(r, b"ambig2")) def test_ambiguous_tag(self): r = {b"refs/tags/ambig3": 'bla', b"refs/heads/ambig3": 'bla', b"refs/remotes/ambig3": 'bla', b"refs/remotes/ambig3/HEAD": "bla"} self.assertEqual(b"refs/tags/ambig3", parse_ref(r, b"ambig3")) def test_ambiguous_head(self): r = {b"refs/heads/ambig4": 'bla', b"refs/remotes/ambig4": 'bla', b"refs/remotes/ambig4/HEAD": "bla"} self.assertEqual(b"refs/heads/ambig4", parse_ref(r, b"ambig4")) def test_ambiguous_remote(self): r = {b"refs/remotes/ambig5": 'bla', b"refs/remotes/ambig5/HEAD": "bla"} self.assertEqual(b"refs/remotes/ambig5", parse_ref(r, b"ambig5")) def test_ambiguous_remote_head(self): r = {b"refs/remotes/ambig6/HEAD": "bla"} self.assertEqual(b"refs/remotes/ambig6/HEAD", parse_ref(r, b"ambig6")) def test_heads_full(self): r = {b"refs/heads/foo": "bla"} self.assertEqual(b"refs/heads/foo", parse_ref(r, b"refs/heads/foo")) def test_heads_partial(self): r = {b"refs/heads/foo": "bla"} self.assertEqual(b"refs/heads/foo", parse_ref(r, b"heads/foo")) def test_tags_partial(self): r = {b"refs/tags/foo": "bla"} self.assertEqual(b"refs/tags/foo", parse_ref(r, b"tags/foo")) class ParseRefsTests(TestCase): def test_nonexistent(self): r = {} self.assertRaises(KeyError, parse_refs, r, [b"thisdoesnotexist"]) def test_head(self): r = {b"refs/heads/foo": "bla"} self.assertEqual([b"refs/heads/foo"], parse_refs(r, [b"foo"])) def test_full(self): r = {b"refs/heads/foo": "bla"} self.assertEqual([b"refs/heads/foo"], parse_refs(r, b"refs/heads/foo")) class ParseReftupleTests(TestCase): def test_nonexistent(self): r = {} self.assertRaises(KeyError, parse_reftuple, r, r, b"thisdoesnotexist") def test_head(self): r = {b"refs/heads/foo": "bla"} self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", False), parse_reftuple(r, r, b"foo")) self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", True), parse_reftuple(r, r, b"+foo")) self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", True), parse_reftuple(r, {}, b"+foo")) + self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", True), + parse_reftuple(r, {}, b"foo", True)) def test_full(self): r = {b"refs/heads/foo": "bla"} self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", False), parse_reftuple(r, r, b"refs/heads/foo")) def test_no_left_ref(self): r = {b"refs/heads/foo": "bla"} self.assertEqual((None, b"refs/heads/foo", False), parse_reftuple(r, r, b":refs/heads/foo")) def test_no_right_ref(self): r = {b"refs/heads/foo": "bla"} self.assertEqual((b"refs/heads/foo", None, False), parse_reftuple(r, r, b"refs/heads/foo:")) def test_default_with_string(self): r = {b"refs/heads/foo": "bla"} self.assertEqual((b"refs/heads/foo", b"refs/heads/foo", False), parse_reftuple(r, r, "foo")) class ParseReftuplesTests(TestCase): def test_nonexistent(self): r = {} self.assertRaises(KeyError, parse_reftuples, r, r, [b"thisdoesnotexist"]) def test_head(self): r = {b"refs/heads/foo": "bla"} self.assertEqual([(b"refs/heads/foo", b"refs/heads/foo", False)], parse_reftuples(r, r, [b"foo"])) def test_full(self): r = {b"refs/heads/foo": "bla"} self.assertEqual([(b"refs/heads/foo", b"refs/heads/foo", False)], parse_reftuples(r, r, b"refs/heads/foo")) + r = {b"refs/heads/foo": "bla"} + self.assertEqual([(b"refs/heads/foo", b"refs/heads/foo", True)], + parse_reftuples(r, r, b"refs/heads/foo", True)) class ParseTreeTests(TestCase): """Test parse_tree.""" def test_nonexistent(self): r = MemoryRepo() self.assertRaises(KeyError, parse_tree, r, "thisdoesnotexist") def test_from_commit(self): r = MemoryRepo() c1, c2, c3 = build_commit_graph( r.object_store, [[1], [2, 1], [3, 1, 2]]) self.assertEqual(r[c1.tree], parse_tree(r, c1.id)) self.assertEqual(r[c1.tree], parse_tree(r, c1.tree))