diff --git a/dulwich/merge_base.py b/dulwich/merge_base.py new file mode 100755 index 00000000..99293e97 --- /dev/null +++ b/dulwich/merge_base.py @@ -0,0 +1,133 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab +# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. + +""" +Implementation of merge-base following the approach of git +""" + +from collections import deque + + +def _find_lcas(lookup_parents, c1, c2s): + cands = [] + cstates = {} + + # Flags to Record State + _ANC_OF_1 = 1 # ancestor of commit 1 + _ANC_OF_2 = 2 # ancestor of commit 2 + _DNC = 4 # Do Not Consider + _LCA = 8 # potential LCA + + def _has_candidates(wlst, cstates): + for cmt in wlst: + if cmt in cstates: + if not (cstates[cmt] & _DNC): + return True + return False + + # initialize the working list + wlst = deque() + cstates[c1] = _ANC_OF_1 + wlst.append(c1) + for c2 in c2s: + cstates[c2] = _ANC_OF_2 + wlst.append(c2) + + # loop until no other LCA candidates are viable in working list + # adding any parents to the list in a breadth first manner + while _has_candidates(wlst, cstates): + cmt = wlst.popleft() + flags = cstates[cmt] + if flags == (_ANC_OF_1 | _ANC_OF_2): + # potential common ancestor + if not (flags & _LCA): + flags = flags | _LCA + cstates[cmt] = flags + cands.append(cmt) + # mark any parents of this node _DNC as all parents + # would be one level further removed common ancestors + flags = flags | _DNC + parents = lookup_parents(cmt) + if parents: + for pcmt in parents: + if pcmt in cstates: + cstates[pcmt] = cstates[pcmt] | flags + else: + cstates[pcmt] = flags + wlst.append(pcmt) + + # walk final candidates removing any superceded by _DNC by later lower LCAs + results = [] + for cmt in cands: + if not (cstates[cmt] & _DNC): + results.append(cmt) + return results + + +def find_merge_base(object_store, commit_ids): + """Find lowest common ancestors of commit_ids[0] and *any* of commits_ids[1:] + + Args: + object_store: object store + commit_ids: list of commit ids + Returns: + list of lowest common ancestor commit_ids + """ + def lookup_parents(commit_id): + return object_store[commit_id].parents + + if not commit_ids: + return [] + c1 = commit_ids[0] + if not len(commit_ids) > 1: + return [c1] + c2s = commit_ids[1:] + if c1 in c2s: + return [c1] + return _find_lcas(lookup_parents, c1, c2s) + + +def find_octopus_base(object_store, commit_ids): + """Find lowest common ancestors of *all* provided commit_ids + + Args: + object_store: Object store + commit_ids: list of commit ids + Returns: + list of lowest common ancestor commit_ids + """ + + def lookup_parents(commit_id): + return object_store[commit_id].parents + + if not commit_ids: + return [] + if len(commit_ids) <= 2: + return find_merge_base(object_store, commit_ids) + lcas = [commit_ids[0]] + others = commit_ids[1:] + for cmt in others: + next_lcas = [] + for ca in lcas: + res = _find_lcas(lookup_parents, cmt, [ca]) + next_lcas.extend(res) + lcas = next_lcas[:] + return lcas diff --git a/dulwich/tests/__init__.py b/dulwich/tests/__init__.py index f46cff60..351665d6 100644 --- a/dulwich/tests/__init__.py +++ b/dulwich/tests/__init__.py @@ -1,198 +1,199 @@ # __init__.py -- The tests for dulwich # Copyright (C) 2007 James Westby # # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for Dulwich.""" import doctest import os import shutil import subprocess import sys import tempfile # If Python itself provides an exception, use that import unittest from unittest import ( # noqa: F401 SkipTest, TestCase as _TestCase, skipIf, expectedFailure, ) class TestCase(_TestCase): def setUp(self): super(TestCase, self).setUp() self._old_home = os.environ.get("HOME") os.environ["HOME"] = "/nonexistant" def tearDown(self): super(TestCase, self).tearDown() if self._old_home: os.environ["HOME"] = self._old_home else: del os.environ["HOME"] class BlackboxTestCase(TestCase): """Blackbox testing.""" # TODO(jelmer): Include more possible binary paths. bin_directories = [os.path.abspath(os.path.join( os.path.dirname(__file__), "..", "..", "bin")), '/usr/bin', '/usr/local/bin'] def bin_path(self, name): """Determine the full path of a binary. Args: name: Name of the script Returns: Full path """ for d in self.bin_directories: p = os.path.join(d, name) if os.path.isfile(p): return p else: raise SkipTest("Unable to find binary %s" % name) def run_command(self, name, args): """Run a Dulwich command. Args: name: Name of the command, as it exists in bin/ args: Arguments to the command """ env = dict(os.environ) env["PYTHONPATH"] = os.pathsep.join(sys.path) # Since they don't have any extensions, Windows can't recognize # executablility of the Python files in /bin. Even then, we'd have to # expect the user to set up file associations for .py files. # # Save us from all that headache and call python with the bin script. argv = [sys.executable, self.bin_path(name)] + args return subprocess.Popen( argv, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, env=env) def self_test_suite(): names = [ 'archive', 'blackbox', 'client', 'config', 'diff_tree', 'fastexport', 'file', 'grafts', 'greenthreads', 'hooks', 'ignore', 'index', 'lfs', 'line_ending', 'lru_cache', 'mailmap', + 'merge_base', 'objects', 'objectspec', 'object_store', 'missing_obj_finder', 'pack', 'patch', 'porcelain', 'protocol', 'reflog', 'refs', 'repository', 'server', 'stash', 'utils', 'walk', 'web', ] module_names = ['dulwich.tests.test_' + name for name in names] loader = unittest.TestLoader() return loader.loadTestsFromNames(module_names) def tutorial_test_suite(): import dulwich.client # noqa: F401 import dulwich.config # noqa: F401 import dulwich.index # noqa: F401 import dulwich.reflog # noqa: F401 import dulwich.repo # noqa: F401 import dulwich.server # noqa: F401 import dulwich.patch # noqa: F401 tutorial = [ 'introduction', 'file-format', 'repo', 'object-store', 'remote', 'conclusion', ] tutorial_files = ["../../docs/tutorial/%s.txt" % name for name in tutorial] def setup(test): test.__old_cwd = os.getcwd() test.tempdir = tempfile.mkdtemp() test.globs.update({'tempdir': test.tempdir}) os.chdir(test.tempdir) def teardown(test): os.chdir(test.__old_cwd) shutil.rmtree(test.tempdir) return doctest.DocFileSuite( module_relative=True, package='dulwich.tests', setUp=setup, tearDown=teardown, *tutorial_files) def nocompat_test_suite(): result = unittest.TestSuite() result.addTests(self_test_suite()) result.addTests(tutorial_test_suite()) from dulwich.contrib import test_suite as contrib_test_suite result.addTests(contrib_test_suite()) return result def compat_test_suite(): result = unittest.TestSuite() from dulwich.tests.compat import test_suite as compat_test_suite result.addTests(compat_test_suite()) return result def test_suite(): result = unittest.TestSuite() result.addTests(self_test_suite()) if sys.platform != 'win32': result.addTests(tutorial_test_suite()) from dulwich.tests.compat import test_suite as compat_test_suite result.addTests(compat_test_suite()) from dulwich.contrib import test_suite as contrib_test_suite result.addTests(contrib_test_suite()) return result diff --git a/dulwich/tests/test_merge_base.py b/dulwich/tests/test_merge_base.py new file mode 100644 index 00000000..6584d9ee --- /dev/null +++ b/dulwich/tests/test_merge_base.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +# test_index.py -- Tests for merge +# encoding: utf-8 +# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada +# +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. + +"""Tests for merge_base.""" + +from dulwich.tests import TestCase + +from dulwich.merge_base import _find_lcas + + +class FindMergeBaseTests(TestCase): + + @staticmethod + def run_test(dag, inputs): + def lookup_parents(commit_id): + return dag[commit_id] + c1 = inputs[0] + c2s = inputs[1:] + return set(_find_lcas(lookup_parents, c1, c2s)) + + def test_multiple_lca(self): + # two lowest common ancestors + graph = { + '5': ['1', '2'], + '4': ['3', '1'], + '3': ['2'], + '2': ['0'], + '1': [], + '0': [] + } + self.assertEqual(self.run_test(graph, ['4', '5']), set(['1', '2'])) + + def test_no_common_ancestor(self): + # no common ancestor + graph = { + '4': ['2'], + '3': ['1'], + '2': [], + '1': ['0'], + '0': [], + } + self.assertEqual(self.run_test(graph, ['4', '3']), set([])) + + def test_ancestor(self): + # ancestor + graph = { + 'G': ['D', 'F'], + 'F': ['E'], + 'D': ['C'], + 'C': ['B'], + 'E': ['B'], + 'B': ['A'], + 'A': [] + } + self.assertEqual(self.run_test(graph, ['D', 'C']), set(['C'])) + + def test_direct_parent(self): + # parent + graph = { + 'G': ['D', 'F'], + 'F': ['E'], + 'D': ['C'], + 'C': ['B'], + 'E': ['B'], + 'B': ['A'], + 'A': [] + } + self.assertEqual(self.run_test(graph, ['G', 'D']), set(['D'])) + + def test_another_crossover(self): + # Another cross over + graph = { + 'G': ['D', 'F'], + 'F': ['E', 'C'], + 'D': ['C', 'E'], + 'C': ['B'], + 'E': ['B'], + 'B': ['A'], + 'A': [] + } + self.assertEqual(self.run_test(graph, ['D', 'F']), set(['E', 'C'])) + + def test_three_way_merge_lca(self): + # three way merge commit straight from git docs + graph = { + 'C': ['C1'], + 'C1': ['C2'], + 'C2': ['C3'], + 'C3': ['C4'], + 'C4': ['2'], + 'B': ['B1'], + 'B1': ['B2'], + 'B2': ['B3'], + 'B3': ['1'], + 'A': ['A1'], + 'A1': ['A2'], + 'A2': ['A3'], + 'A3': ['1'], + '1': ['2'], + '2': [], + } + # assumes a theoretical merge M exists that merges B and C first + # which actually means find the first LCA from either of B OR C with A + self.assertEqual(self.run_test(graph, ['A', 'B', 'C']), set(['1'])) + + def test_octopus(self): + # octopus algorithm test + # test straight from git docs of A, B, and C + # but this time use octopus to find lcas of A, B, and C simultaneously + graph = { + 'C': ['C1'], + 'C1': ['C2'], + 'C2': ['C3'], + 'C3': ['C4'], + 'C4': ['2'], + 'B': ['B1'], + 'B1': ['B2'], + 'B2': ['B3'], + 'B3': ['1'], + 'A': ['A1'], + 'A1': ['A2'], + 'A2': ['A3'], + 'A3': ['1'], + '1': ['2'], + '2': [], + } + + def lookup_parents(cid): + return graph[cid] + lcas = ['A'] + others = ['B', 'C'] + for cmt in others: + next_lcas = [] + for ca in lcas: + res = _find_lcas(lookup_parents, cmt, [ca]) + next_lcas.extend(res) + lcas = next_lcas[:] + self.assertEqual(set(lcas), set(['2']))