diff --git a/dulwich/merge_base.py b/dulwich/merge_base.py
new file mode 100755
index 00000000..99293e97
--- /dev/null
+++ b/dulwich/merge_base.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# vim:ts=4:sw=4:softtabstop=4:smarttab:expandtab
+# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# for a copy of the GNU General Public License
+# and for a copy of the Apache
+# License, Version 2.0.
+
+"""
+Implementation of merge-base following the approach of git
+"""
+
+from collections import deque
+
+
+def _find_lcas(lookup_parents, c1, c2s):
+ cands = []
+ cstates = {}
+
+ # Flags to Record State
+ _ANC_OF_1 = 1 # ancestor of commit 1
+ _ANC_OF_2 = 2 # ancestor of commit 2
+ _DNC = 4 # Do Not Consider
+ _LCA = 8 # potential LCA
+
+ def _has_candidates(wlst, cstates):
+ for cmt in wlst:
+ if cmt in cstates:
+ if not (cstates[cmt] & _DNC):
+ return True
+ return False
+
+ # initialize the working list
+ wlst = deque()
+ cstates[c1] = _ANC_OF_1
+ wlst.append(c1)
+ for c2 in c2s:
+ cstates[c2] = _ANC_OF_2
+ wlst.append(c2)
+
+ # loop until no other LCA candidates are viable in working list
+ # adding any parents to the list in a breadth first manner
+ while _has_candidates(wlst, cstates):
+ cmt = wlst.popleft()
+ flags = cstates[cmt]
+ if flags == (_ANC_OF_1 | _ANC_OF_2):
+ # potential common ancestor
+ if not (flags & _LCA):
+ flags = flags | _LCA
+ cstates[cmt] = flags
+ cands.append(cmt)
+ # mark any parents of this node _DNC as all parents
+ # would be one level further removed common ancestors
+ flags = flags | _DNC
+ parents = lookup_parents(cmt)
+ if parents:
+ for pcmt in parents:
+ if pcmt in cstates:
+ cstates[pcmt] = cstates[pcmt] | flags
+ else:
+ cstates[pcmt] = flags
+ wlst.append(pcmt)
+
+ # walk final candidates removing any superceded by _DNC by later lower LCAs
+ results = []
+ for cmt in cands:
+ if not (cstates[cmt] & _DNC):
+ results.append(cmt)
+ return results
+
+
+def find_merge_base(object_store, commit_ids):
+ """Find lowest common ancestors of commit_ids[0] and *any* of commits_ids[1:]
+
+ Args:
+ object_store: object store
+ commit_ids: list of commit ids
+ Returns:
+ list of lowest common ancestor commit_ids
+ """
+ def lookup_parents(commit_id):
+ return object_store[commit_id].parents
+
+ if not commit_ids:
+ return []
+ c1 = commit_ids[0]
+ if not len(commit_ids) > 1:
+ return [c1]
+ c2s = commit_ids[1:]
+ if c1 in c2s:
+ return [c1]
+ return _find_lcas(lookup_parents, c1, c2s)
+
+
+def find_octopus_base(object_store, commit_ids):
+ """Find lowest common ancestors of *all* provided commit_ids
+
+ Args:
+ object_store: Object store
+ commit_ids: list of commit ids
+ Returns:
+ list of lowest common ancestor commit_ids
+ """
+
+ def lookup_parents(commit_id):
+ return object_store[commit_id].parents
+
+ if not commit_ids:
+ return []
+ if len(commit_ids) <= 2:
+ return find_merge_base(object_store, commit_ids)
+ lcas = [commit_ids[0]]
+ others = commit_ids[1:]
+ for cmt in others:
+ next_lcas = []
+ for ca in lcas:
+ res = _find_lcas(lookup_parents, cmt, [ca])
+ next_lcas.extend(res)
+ lcas = next_lcas[:]
+ return lcas
diff --git a/dulwich/tests/__init__.py b/dulwich/tests/__init__.py
index f46cff60..351665d6 100644
--- a/dulwich/tests/__init__.py
+++ b/dulwich/tests/__init__.py
@@ -1,198 +1,199 @@
# __init__.py -- The tests for dulwich
# Copyright (C) 2007 James Westby
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Tests for Dulwich."""
import doctest
import os
import shutil
import subprocess
import sys
import tempfile
# If Python itself provides an exception, use that
import unittest
from unittest import ( # noqa: F401
SkipTest,
TestCase as _TestCase,
skipIf,
expectedFailure,
)
class TestCase(_TestCase):
def setUp(self):
super(TestCase, self).setUp()
self._old_home = os.environ.get("HOME")
os.environ["HOME"] = "/nonexistant"
def tearDown(self):
super(TestCase, self).tearDown()
if self._old_home:
os.environ["HOME"] = self._old_home
else:
del os.environ["HOME"]
class BlackboxTestCase(TestCase):
"""Blackbox testing."""
# TODO(jelmer): Include more possible binary paths.
bin_directories = [os.path.abspath(os.path.join(
os.path.dirname(__file__), "..", "..", "bin")), '/usr/bin',
'/usr/local/bin']
def bin_path(self, name):
"""Determine the full path of a binary.
Args:
name: Name of the script
Returns: Full path
"""
for d in self.bin_directories:
p = os.path.join(d, name)
if os.path.isfile(p):
return p
else:
raise SkipTest("Unable to find binary %s" % name)
def run_command(self, name, args):
"""Run a Dulwich command.
Args:
name: Name of the command, as it exists in bin/
args: Arguments to the command
"""
env = dict(os.environ)
env["PYTHONPATH"] = os.pathsep.join(sys.path)
# Since they don't have any extensions, Windows can't recognize
# executablility of the Python files in /bin. Even then, we'd have to
# expect the user to set up file associations for .py files.
#
# Save us from all that headache and call python with the bin script.
argv = [sys.executable, self.bin_path(name)] + args
return subprocess.Popen(
argv,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
def self_test_suite():
names = [
'archive',
'blackbox',
'client',
'config',
'diff_tree',
'fastexport',
'file',
'grafts',
'greenthreads',
'hooks',
'ignore',
'index',
'lfs',
'line_ending',
'lru_cache',
'mailmap',
+ 'merge_base',
'objects',
'objectspec',
'object_store',
'missing_obj_finder',
'pack',
'patch',
'porcelain',
'protocol',
'reflog',
'refs',
'repository',
'server',
'stash',
'utils',
'walk',
'web',
]
module_names = ['dulwich.tests.test_' + name for name in names]
loader = unittest.TestLoader()
return loader.loadTestsFromNames(module_names)
def tutorial_test_suite():
import dulwich.client # noqa: F401
import dulwich.config # noqa: F401
import dulwich.index # noqa: F401
import dulwich.reflog # noqa: F401
import dulwich.repo # noqa: F401
import dulwich.server # noqa: F401
import dulwich.patch # noqa: F401
tutorial = [
'introduction',
'file-format',
'repo',
'object-store',
'remote',
'conclusion',
]
tutorial_files = ["../../docs/tutorial/%s.txt" % name for name in tutorial]
def setup(test):
test.__old_cwd = os.getcwd()
test.tempdir = tempfile.mkdtemp()
test.globs.update({'tempdir': test.tempdir})
os.chdir(test.tempdir)
def teardown(test):
os.chdir(test.__old_cwd)
shutil.rmtree(test.tempdir)
return doctest.DocFileSuite(
module_relative=True, package='dulwich.tests',
setUp=setup, tearDown=teardown, *tutorial_files)
def nocompat_test_suite():
result = unittest.TestSuite()
result.addTests(self_test_suite())
result.addTests(tutorial_test_suite())
from dulwich.contrib import test_suite as contrib_test_suite
result.addTests(contrib_test_suite())
return result
def compat_test_suite():
result = unittest.TestSuite()
from dulwich.tests.compat import test_suite as compat_test_suite
result.addTests(compat_test_suite())
return result
def test_suite():
result = unittest.TestSuite()
result.addTests(self_test_suite())
if sys.platform != 'win32':
result.addTests(tutorial_test_suite())
from dulwich.tests.compat import test_suite as compat_test_suite
result.addTests(compat_test_suite())
from dulwich.contrib import test_suite as contrib_test_suite
result.addTests(contrib_test_suite())
return result
diff --git a/dulwich/tests/test_merge_base.py b/dulwich/tests/test_merge_base.py
new file mode 100644
index 00000000..6584d9ee
--- /dev/null
+++ b/dulwich/tests/test_merge_base.py
@@ -0,0 +1,156 @@
+# -*- coding: utf-8 -*-
+# test_index.py -- Tests for merge
+# encoding: utf-8
+# Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# for a copy of the GNU General Public License
+# and for a copy of the Apache
+# License, Version 2.0.
+
+"""Tests for merge_base."""
+
+from dulwich.tests import TestCase
+
+from dulwich.merge_base import _find_lcas
+
+
+class FindMergeBaseTests(TestCase):
+
+ @staticmethod
+ def run_test(dag, inputs):
+ def lookup_parents(commit_id):
+ return dag[commit_id]
+ c1 = inputs[0]
+ c2s = inputs[1:]
+ return set(_find_lcas(lookup_parents, c1, c2s))
+
+ def test_multiple_lca(self):
+ # two lowest common ancestors
+ graph = {
+ '5': ['1', '2'],
+ '4': ['3', '1'],
+ '3': ['2'],
+ '2': ['0'],
+ '1': [],
+ '0': []
+ }
+ self.assertEqual(self.run_test(graph, ['4', '5']), set(['1', '2']))
+
+ def test_no_common_ancestor(self):
+ # no common ancestor
+ graph = {
+ '4': ['2'],
+ '3': ['1'],
+ '2': [],
+ '1': ['0'],
+ '0': [],
+ }
+ self.assertEqual(self.run_test(graph, ['4', '3']), set([]))
+
+ def test_ancestor(self):
+ # ancestor
+ graph = {
+ 'G': ['D', 'F'],
+ 'F': ['E'],
+ 'D': ['C'],
+ 'C': ['B'],
+ 'E': ['B'],
+ 'B': ['A'],
+ 'A': []
+ }
+ self.assertEqual(self.run_test(graph, ['D', 'C']), set(['C']))
+
+ def test_direct_parent(self):
+ # parent
+ graph = {
+ 'G': ['D', 'F'],
+ 'F': ['E'],
+ 'D': ['C'],
+ 'C': ['B'],
+ 'E': ['B'],
+ 'B': ['A'],
+ 'A': []
+ }
+ self.assertEqual(self.run_test(graph, ['G', 'D']), set(['D']))
+
+ def test_another_crossover(self):
+ # Another cross over
+ graph = {
+ 'G': ['D', 'F'],
+ 'F': ['E', 'C'],
+ 'D': ['C', 'E'],
+ 'C': ['B'],
+ 'E': ['B'],
+ 'B': ['A'],
+ 'A': []
+ }
+ self.assertEqual(self.run_test(graph, ['D', 'F']), set(['E', 'C']))
+
+ def test_three_way_merge_lca(self):
+ # three way merge commit straight from git docs
+ graph = {
+ 'C': ['C1'],
+ 'C1': ['C2'],
+ 'C2': ['C3'],
+ 'C3': ['C4'],
+ 'C4': ['2'],
+ 'B': ['B1'],
+ 'B1': ['B2'],
+ 'B2': ['B3'],
+ 'B3': ['1'],
+ 'A': ['A1'],
+ 'A1': ['A2'],
+ 'A2': ['A3'],
+ 'A3': ['1'],
+ '1': ['2'],
+ '2': [],
+ }
+ # assumes a theoretical merge M exists that merges B and C first
+ # which actually means find the first LCA from either of B OR C with A
+ self.assertEqual(self.run_test(graph, ['A', 'B', 'C']), set(['1']))
+
+ def test_octopus(self):
+ # octopus algorithm test
+ # test straight from git docs of A, B, and C
+ # but this time use octopus to find lcas of A, B, and C simultaneously
+ graph = {
+ 'C': ['C1'],
+ 'C1': ['C2'],
+ 'C2': ['C3'],
+ 'C3': ['C4'],
+ 'C4': ['2'],
+ 'B': ['B1'],
+ 'B1': ['B2'],
+ 'B2': ['B3'],
+ 'B3': ['1'],
+ 'A': ['A1'],
+ 'A1': ['A2'],
+ 'A2': ['A3'],
+ 'A3': ['1'],
+ '1': ['2'],
+ '2': [],
+ }
+
+ def lookup_parents(cid):
+ return graph[cid]
+ lcas = ['A']
+ others = ['B', 'C']
+ for cmt in others:
+ next_lcas = []
+ for ca in lcas:
+ res = _find_lcas(lookup_parents, cmt, [ca])
+ next_lcas.extend(res)
+ lcas = next_lcas[:]
+ self.assertEqual(set(lcas), set(['2']))