Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/bin/subvertpy-fast-export b/bin/subvertpy-fast-export
index c12c25df..3f9da508 100755
--- a/bin/subvertpy-fast-export
+++ b/bin/subvertpy-fast-export
@@ -1,231 +1,231 @@
#!/usr/bin/python
#
# svn-fast-export.py
# ----------
# Walk through each revision of a local Subversion repository and export it
# in a stream that git-fast-import can consume.
#
# Author: Chris Lee <clee@kde.org>
# License: MIT <http://www.opensource.org/licenses/mit-license.php>
#
# Adapted for subvertpy by Jelmer Vernooij <jelmer@samba.org>
trunk_path = '/trunk/'
branches_path = '/branches/'
tags_path = '/tags/'
address = 'localhost'
-from cStringIO import StringIO
+from io import BytesIO
import sys, os.path
from optparse import OptionParser
import stat
from time import mktime, strptime
from subvertpy.repos import PATH_CHANGE_DELETE, Repository
ct_short = ['M', 'A', 'D', 'R', 'X']
def dump_file_blob(root, stream, stream_length):
sys.stdout.write("data %s\n" % stream_length)
sys.stdout.flush()
sys.stdout.write(stream.read())
sys.stdout.write("\n")
class Matcher(object):
branch = None
def __init__(self, trunk_path):
self.trunk_path = trunk_path
def branchname(self):
return self.branch
def __str__(self):
return super(Matcher, self).__str__() + ":" + self.trunk_path
@staticmethod
def getMatcher(trunk_path):
if trunk_path.startswith("regex:"):
return RegexStringMatcher(trunk_path)
else:
return StaticStringMatcher(trunk_path)
class StaticStringMatcher(Matcher):
branch = "master"
def __init__(self, trunk_path):
if not trunk_path.startswith("/"):
raise ValueError("Trunk path does not start with a slash (/)")
Matcher.__init__(self, trunk_path)
def matches(self, path):
return path.startswith(self.trunk_path)
def replace(self, path):
return path.replace(self.trunk_path, '')
class RegexStringMatcher(Matcher):
def __init__(self, trunk_path):
super(RegexStringMatcher, self).__init__(trunk_path)
import re
self.matcher = re.compile(self.trunk_path[len("regex:"):])
def matches(self, path):
match = self.matcher.match(path)
if match:
self.branch = match.group(1)
return True
else:
return False
def replace(self, path):
return self.matcher.sub("\g<2>", path)
MATCHER = None
def export_revision(rev, fs):
sys.stderr.write("Exporting revision %s... " % rev)
# Open a root object representing the youngest (HEAD) revision.
root = fs.revision_root(rev)
# And the list of what changed in this revision.
changes = root.paths_changed()
i = 1
marks = {}
file_changes = []
for path, (node_id, change_type, text_changed, prop_changed) in changes.iteritems():
if root.is_dir(path):
continue
if not MATCHER.matches(path):
# We don't handle branches. Or tags. Yet.
pass
else:
if change_type == PATH_CHANGE_DELETE:
file_changes.append("D %s" % MATCHER.replace(path).lstrip("/"))
else:
props = root.proplist(path)
marks[i] = MATCHER.replace(path)
if props.get("svn:special", ""):
contents = root.file_content(path).read()
if not contents.startswith("link "):
sys.stderr.write("special file '%s' is not a symlink, ignoring...\n" % path)
continue
mode = stat.S_IFLNK
- stream = StringIO(contents[len("link "):])
+ stream = BytesIO(contents[len("link "):])
stream_length = len(stream.getvalue())
else:
if props.get("svn:executable", ""):
mode = 0o755
else:
mode = 0o644
stream_length = root.file_length(path)
stream = root.file_content(path)
file_changes.append("M %o :%s %s" % (
mode, i, marks[i].lstrip("/")))
sys.stdout.write("blob\nmark :%s\n" % i)
dump_file_blob(root, stream, stream_length)
stream.close()
i += 1
# Get the commit author and message
props = fs.revision_proplist(rev)
# Do the recursive crawl.
if 'svn:author' in props:
author = "%s <%s@%s>" % (props['svn:author'], props['svn:author'], address)
else:
author = 'nobody <nobody@localhost>'
if len(file_changes) == 0:
sys.stderr.write("skipping.\n")
return
svndate = props['svn:date'][0:-8]
commit_time = mktime(strptime(svndate, '%Y-%m-%dT%H:%M:%S'))
sys.stdout.write("commit refs/heads/%s\n" % MATCHER.branchname())
sys.stdout.write("committer %s %s -0000\n" % (author, int(commit_time)))
sys.stdout.write("data %s\n" % len(props['svn:log']))
sys.stdout.write(props['svn:log'])
sys.stdout.write("\n")
sys.stdout.write('\n'.join(file_changes))
sys.stdout.write("\n\n")
sys.stderr.write("done!\n")
def crawl_revisions(repos_path, first_rev=None, final_rev=None):
"""Open the repository at REPOS_PATH, and recursively crawl all its
revisions."""
# Open the repository at REPOS_PATH, and get a reference to its
# versioning filesystem.
fs_obj = Repository(repos_path).fs()
# Query the current youngest revision.
if first_rev is None:
first_rev = 1
if final_rev is None:
final_rev = fs_obj.youngest_revision()
for rev in xrange(first_rev, final_rev + 1):
export_revision(rev, fs_obj)
if __name__ == '__main__':
usage = '%prog [options] REPOS_PATH'
parser = OptionParser()
parser.set_usage(usage)
parser.add_option('-f', '--final-rev', help='Final revision to import',
dest='final_rev', metavar='FINAL_REV', type='int')
parser.add_option('-r', '--first-rev', help='First revision to import',
dest='first_rev', metavar='FIRST_REV', type='int')
parser.add_option('-t', '--trunk-path', help="Path in repo to /trunk, may be `regex:/cvs/(trunk)/proj1/(.*)`\nFirst group is used as branchname, second to match files",
dest='trunk_path', metavar='TRUNK_PATH')
parser.add_option('-b', '--branches-path', help='Path in repo to /branches',
dest='branches_path', metavar='BRANCHES_PATH')
parser.add_option('-T', '--tags-path', help='Path in repo to /tags',
dest='tags_path', metavar='TAGS_PATH')
parser.add_option('-a', '--address', help='Domain to put on users for their mail address',
dest='address', metavar='hostname', type='string')
parser.add_option("--version", help="Print version and exit", action="store_true")
(options, args) = parser.parse_args()
if options.version:
import subvertpy
print ".".join(str(x) for x in subvertpy.__version__)
sys.exit(0)
if options.trunk_path != None:
trunk_path = options.trunk_path
if options.branches_path != None:
branches_path = options.branches_path
if options.tags_path != None:
tags_path = options.tags_path
if options.address != None:
address = options.address
MATCHER = Matcher.getMatcher(trunk_path)
sys.stderr.write("%s\n" % MATCHER)
if len(args) != 1:
parser.print_help()
sys.exit(2)
# Canonicalize (enough for Subversion, at least) the repository path.
repos_path = os.path.normpath(args[0])
if repos_path == '.':
repos_path = ''
try:
import msvcrt
msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
except ImportError:
pass
crawl_revisions(repos_path, first_rev=options.first_rev,
final_rev=options.final_rev)
diff --git a/examples/ra_commit.py b/examples/ra_commit.py
index 36d6160a..2f51c308 100755
--- a/examples/ra_commit.py
+++ b/examples/ra_commit.py
@@ -1,45 +1,45 @@
#!/usr/bin/python
# Demonstrates how to do a new commit using Subvertpy
import os
-from cStringIO import StringIO
+from io import BytesIO
from subvertpy import delta, repos
from subvertpy.ra import RemoteAccess, Auth, get_username_provider
# Create a repository
repos.create("tmprepo")
# Connect to the "remote" repository using the file transport.
# Note that a username provider needs to be provided, so that Subversion
# knows who to record as the author of new commits made over this connection.
repo_url = "file://%s" % os.path.abspath("tmprepo")
conn = RemoteAccess(repo_url,
auth=Auth([get_username_provider()]))
# Simple commit that adds a directory
editor = conn.get_commit_editor({"svn:log": "Commit message"})
root = editor.open_root()
# Add a directory
dir = root.add_directory("somedir")
dir.close()
# Add and edit a file
file = root.add_file("somefile")
# Set the svn:executable attribute
file.change_prop("svn:executable", "*")
# Obtain a textdelta handler and send the new file contents
txdelta = file.apply_textdelta()
-delta.send_stream(StringIO("new file contents"), txdelta)
+delta.send_stream(BytesIO(b"new file contents"), txdelta)
file.close()
root.close()
editor.close()
# Rename the directory
editor = conn.get_commit_editor({"svn:log": "Commit message"})
root = editor.open_root()
# Create a new directory copied from somedir:1
dir = root.add_directory("new dir name", "%s/somedir" % repo_url, 1)
dir.close()
# Remove the original directory
root.delete_entry("somedir")
root.close()
editor.close()
diff --git a/subvertpy/tests/__init__.py b/subvertpy/tests/__init__.py
index 1ca103c8..b12ffd22 100644
--- a/subvertpy/tests/__init__.py
+++ b/subvertpy/tests/__init__.py
@@ -1,450 +1,450 @@
# Copyright (C) 2006-2008 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
"""Tests for subvertpy."""
__author__ = 'Jelmer Vernooij <jelmer@samba.org>'
__docformat__ = 'restructuredText'
-from cStringIO import StringIO
+from io import BytesIO
import os
import shutil
import stat
import sys
import tempfile
import unittest
try:
from unittest import SkipTest
except ImportError:
try:
from unittest2 import SkipTest
except ImportError:
from testtools.testcase import TestSkipped as SkipTest
import urllib2
import urllib
import urlparse
from subvertpy import (
client,
delta,
properties,
ra,
repos,
)
from subvertpy.ra import (
Auth,
RemoteAccess,
)
def rmtree_with_readonly(path):
"""Simple wrapper for shutil.rmtree that can remove read-only files.
In Windows a read-only file cannot be removed, and shutil.rmtree fails.
"""
def force_rm_handle(remove_path, path, excinfo):
os.chmod(path, os.stat(path).st_mode | stat.S_IWUSR | stat.S_IWGRP |
stat.S_IWOTH)
remove_path(path)
shutil.rmtree(path, onerror=force_rm_handle)
class TestCase(unittest.TestCase):
"""Base test case.
:note: Adds assertIsInstance and assertIs.
"""
def assertIsInstance(self, obj, kls, msg=None):
"""Fail if obj is not an instance of kls"""
if not isinstance(obj, kls):
if msg is None: msg = "%r is an instance of %s rather than %s" % (
obj, obj.__class__, kls)
self.fail(msg)
def assertIs(self, left, right, message=None):
if not (left is right):
if message is not None:
raise AssertionError(message)
else:
raise AssertionError("%r is not %r." % (left, right))
class TestCaseInTempDir(TestCase):
"""Test case that runs in a temporary directory."""
def setUp(self):
TestCase.setUp(self)
self._oldcwd = os.getcwd()
self.test_dir = tempfile.mkdtemp()
os.chdir(self.test_dir)
def tearDown(self):
TestCase.tearDown(self)
os.chdir(self._oldcwd)
rmtree_with_readonly(self.test_dir)
class TestFileEditor(object):
"""Simple file editor wrapper that doesn't require closing."""
def __init__(self, file):
self.file = file
self.is_closed = False
def change_prop(self, name, value):
self.file.change_prop(name, value)
def modify(self, contents=None):
if contents is None:
contents = urllib2.randombytes(100)
txdelta = self.file.apply_textdelta()
- delta.send_stream(StringIO(contents), txdelta)
+ delta.send_stream(BytesIO(contents), txdelta)
def close(self):
assert not self.is_closed
self.is_closed = True
self.file.close()
class TestDirEditor(object):
"""Simple dir editor wrapper that doesn't require closing."""
def __init__(self, dir, baseurl, revnum):
self.dir = dir
self.baseurl = baseurl
self.revnum = revnum
self.is_closed = False
self.children = []
def close_children(self):
for c in reversed(self.children):
if not c.is_closed:
c.close()
def close(self):
assert not self.is_closed
self.is_closed = True
self.close_children()
self.dir.close()
def change_prop(self, name, value):
self.close_children()
self.dir.change_prop(name, value)
def open_dir(self, path):
self.close_children()
child = TestDirEditor(self.dir.open_directory(path, -1), self.baseurl, self.revnum)
self.children.append(child)
return child
def open_file(self, path):
self.close_children()
child = TestFileEditor(self.dir.open_file(path, -1))
self.children.append(child)
return child
def add_dir(self, path, copyfrom_path=None, copyfrom_rev=-1):
self.close_children()
if copyfrom_path is not None:
copyfrom_path = urlparse.urljoin(self.baseurl+"/", copyfrom_path)
if copyfrom_path is not None and copyfrom_rev == -1:
copyfrom_rev = self.revnum
assert (copyfrom_path is None and copyfrom_rev == -1) or \
(copyfrom_path is not None and copyfrom_rev > -1)
child = TestDirEditor(self.dir.add_directory(path, copyfrom_path,
copyfrom_rev), self.baseurl, self.revnum)
self.children.append(child)
return child
def add_file(self, path, copyfrom_path=None, copyfrom_rev=-1):
self.close_children()
if copyfrom_path is not None:
copyfrom_path = urlparse.urljoin(self.baseurl+"/", copyfrom_path)
if copyfrom_path is not None and copyfrom_rev == -1:
copyfrom_rev = self.revnum
child = TestFileEditor(self.dir.add_file(path, copyfrom_path,
copyfrom_rev))
self.children.append(child)
return child
def delete(self, path):
self.close_children()
self.dir.delete_entry(path)
class TestCommitEditor(TestDirEditor):
"""Simple commit editor wrapper."""
def __init__(self, editor, baseurl, revnum):
self.editor = editor
TestDirEditor.__init__(self, self.editor.open_root(), baseurl, revnum)
def close(self):
TestDirEditor.close(self)
self.editor.close()
class SubversionTestCase(TestCaseInTempDir):
"""A test case that provides the ability to build Subversion
repositories."""
def _init_client(self):
self.client_ctx = client.Client()
self.client_ctx.auth = Auth([ra.get_simple_provider(),
ra.get_username_provider(),
ra.get_ssl_client_cert_file_provider(),
ra.get_ssl_client_cert_pw_file_provider(),
ra.get_ssl_server_trust_file_provider()])
self.client_ctx.log_msg_func = self.log_message_func
#self.client_ctx.notify_func = lambda err: mutter("Error: %s" % err)
def setUp(self):
super(SubversionTestCase, self).setUp()
self._init_client()
def tearDown(self):
del self.client_ctx
super(SubversionTestCase, self).tearDown()
def log_message_func(self, items):
return self.next_message
def make_repository(self, relpath, allow_revprop_changes=True):
"""Create a repository.
:return: Handle to the repository.
"""
abspath = os.path.join(self.test_dir, relpath)
repos.create(abspath)
if allow_revprop_changes:
if sys.platform == 'win32':
revprop_hook = os.path.join(abspath, "hooks",
"pre-revprop-change.bat")
f = open(revprop_hook, 'w')
try:
f.write("exit 0\n")
finally:
f.close()
else:
revprop_hook = os.path.join(abspath, "hooks",
"pre-revprop-change")
f = open(revprop_hook, 'w')
try:
f.write("#!/bin/sh\n")
finally:
f.close()
os.chmod(revprop_hook, os.stat(revprop_hook).st_mode | 0o111)
if sys.platform == 'win32':
return 'file:%s' % urllib.pathname2url(abspath)
else:
return "file://%s" % abspath
def make_checkout(self, repos_url, relpath):
"""Create a new checkout."""
self.client_ctx.checkout(repos_url, relpath, "HEAD")
def client_set_prop(self, path, name, value):
"""Set a property on a local file or directory."""
if value is None:
value = ""
self.client_ctx.propset(name, value, path, False, True)
def client_get_prop(self, path, name, revnum=None, recursive=False):
"""Retrieve a property from a local or remote file or directory."""
if revnum is None:
rev = "WORKING"
else:
rev = revnum
ret = self.client_ctx.propget(name, path, rev, rev, recursive)
if recursive:
return ret
else:
return ret.values()[0]
def client_get_revprop(self, url, revnum, name):
"""Get the revision property.
:param url: URL of the repository
:param revnum: Revision number
:param name: Property name
:return: Revision property value
"""
r = ra.RemoteAccess(url)
return r.rev_proplist(revnum)[name]
def client_set_revprop(self, url, revnum, name, value):
"""Set a revision property on a repository.
:param url: URL of the repository
:param revnum: Revision number of the revision
:param name: Name of the property
:param value: Value of the property, None to remove
"""
r = ra.RemoteAccess(url, auth=Auth([ra.get_username_provider()]))
r.change_rev_prop(revnum, name, value)
def client_resolve(self, path, choice, depth=0):
"""Resolve a conflict set on a local path."""
self.client_ctx.resolve(path, depth, choice)
def client_commit(self, dir, message=None, recursive=True):
"""Commit current changes in specified working copy.
:param dir: List of paths to commit.
"""
olddir = os.path.abspath('.')
self.next_message = message
os.chdir(dir)
info = self.client_ctx.commit(["."], recursive, False)
os.chdir(olddir)
assert info is not None
return info
def client_add(self, relpath, recursive=True):
"""Add specified files to working copy.
:param relpath: Path to the files to add.
"""
self.client_ctx.add(relpath, recursive, False, False)
def client_log(self, url, start_revnum, stop_revnum):
"""Fetch the log
:param url: URL to log
:param start_revnum: Start revision of the range to log over
:param start_revnum: Stop revision of the range to log over
:return: Dictionary
"""
r = ra.RemoteAccess(url)
assert isinstance(url, str)
ret = {}
def rcvr(orig_paths, rev, revprops, has_children=None):
ret[rev] = (orig_paths,
revprops.get(properties.PROP_REVISION_AUTHOR),
revprops.get(properties.PROP_REVISION_DATE),
revprops.get(properties.PROP_REVISION_LOG))
r.get_log(rcvr, [""], start_revnum, stop_revnum, 0, True, True,
revprops=[properties.PROP_REVISION_AUTHOR,
properties.PROP_REVISION_DATE,
properties.PROP_REVISION_LOG])
return ret
def client_delete(self, relpath):
"""Remove specified files from working copy.
:param relpath: Path to the files to remove.
"""
self.client_ctx.delete([relpath], True)
def client_copy(self, oldpath, newpath, revnum=None):
"""Copy file in working copy.
:param oldpath: Relative path to original file.
:param newpath: Relative path to new file.
"""
if revnum is None:
rev = "HEAD"
else:
rev = revnum
self.client_ctx.copy(oldpath, newpath, rev)
def client_update(self, path):
"""Update path.
:param path: Path
"""
self.client_ctx.update([path], "HEAD", True)
def build_tree(self, files):
"""Create a directory tree.
:param files: Dictionary with filenames as keys, contents as
values. None as value indicates a directory.
"""
for name, content in files.iteritems():
if content is None:
try:
os.makedirs(name)
except OSError:
pass
else:
try:
os.makedirs(os.path.dirname(name))
except OSError:
pass
f = open(name, 'w')
try:
f.write(content)
finally:
f.close()
def make_client(self, repospath, clientpath, allow_revprop_changes=True):
"""Create a repository and a checkout. Return the checkout.
:param repospath: Optional relpath to check out if not the full
repository.
:param clientpath: Path to checkout
:return: Repository URL.
"""
repos_url = self.make_repository(repospath,
allow_revprop_changes=allow_revprop_changes)
self.make_checkout(repos_url, clientpath)
return repos_url
def open_fs(self, relpath):
"""Open a fs.
:return: FS.
"""
return repos.Repository(relpath).fs()
def get_commit_editor(self, url, message="Test commit"):
"""Obtain a commit editor.
:param url: URL to connect to
:param message: Commit message
:return: Commit editor object
"""
ra_ctx = RemoteAccess(url.encode("utf-8"),
auth=Auth([ra.get_username_provider()]))
revnum = ra_ctx.get_latest_revnum()
return TestCommitEditor(ra_ctx.get_commit_editor({"svn:log": message}),
ra_ctx.url, revnum)
def test_suite():
names = [
'client',
'core',
'delta',
'marshall',
'properties',
'ra',
'repos',
'server',
'wc',
]
module_names = ['subvertpy.tests.test_' + name for name in names]
result = unittest.TestSuite()
loader = unittest.TestLoader()
suite = loader.loadTestsFromNames(module_names)
result.addTests(suite)
return result
diff --git a/subvertpy/tests/test_client.py b/subvertpy/tests/test_client.py
index 4d91e7c9..acf02a90 100644
--- a/subvertpy/tests/test_client.py
+++ b/subvertpy/tests/test_client.py
@@ -1,260 +1,260 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Subversion client library tests."""
from datetime import datetime, timedelta
import os
-from StringIO import StringIO
+from io import BytesIO
import shutil
import tempfile
from subvertpy import (
SubversionException,
NODE_DIR, NODE_FILE,
client,
ra,
wc,
)
from subvertpy.tests import (
SubversionTestCase,
SkipTest,
TestCase,
)
class VersionTest(TestCase):
def test_version_length(self):
self.assertEqual(4, len(client.version()))
def test_api_version_length(self):
self.assertEqual(4, len(client.api_version()))
def test_api_version_later_same(self):
self.assertTrue(client.api_version() <= client.version())
class TestClient(SubversionTestCase):
def setUp(self):
super(TestClient, self).setUp()
self.repos_url = self.make_client("d", "dc")
self.client = client.Client(auth=ra.Auth([ra.get_username_provider()]))
def tearDown(self):
del self.client
super(TestClient, self).tearDown()
def test_add(self):
self.build_tree({"dc/foo": None})
self.client.add("dc/foo")
def test_commit(self):
self.build_tree({"dc/foo": None})
self.client.add("dc/foo")
self.client.log_msg_func = lambda c: "Amessage"
self.client.commit(["dc"])
r = ra.RemoteAccess(self.repos_url)
revprops = r.rev_proplist(1)
self.assertEqual("Amessage", revprops["svn:log"])
def test_commit_start(self):
self.build_tree({"dc/foo": None})
self.client = client.Client(auth=ra.Auth([ra.get_username_provider()]),
log_msg_func=lambda c: "Bmessage")
self.client.add("dc/foo")
self.client.commit(["dc"])
r = ra.RemoteAccess(self.repos_url)
revprops = r.rev_proplist(1)
self.assertEqual("Bmessage", revprops["svn:log"])
def test_mkdir(self):
self.client.mkdir(["dc/foo"])
self.client.mkdir("dc/bar")
self.client.mkdir("dc/bla", revprops={"svn:log": "foo"})
def test_export(self):
self.build_tree({"dc/foo": "bla"})
self.client.add("dc/foo")
self.client.commit(["dc"])
self.client.export(self.repos_url, "de")
self.assertEqual(["foo"], os.listdir("de"))
def test_add_recursive(self):
self.build_tree({"dc/trunk/foo": 'bla', "dc/trunk": None})
self.client.add("dc/trunk")
if getattr(wc, "WorkingCopy", None) is None:
raise SkipTest(
"subversion 1.7 API not supported for WorkingCopy yet")
adm = wc.WorkingCopy(None, os.path.join(os.getcwd(), "dc"))
e = adm.entry(os.path.join(os.getcwd(), "dc", "trunk"))
self.assertEqual(e.kind, NODE_DIR)
adm2 = wc.WorkingCopy(None, os.path.join(os.getcwd(), "dc", "trunk"))
e = adm2.entry(os.path.join(os.getcwd(), "dc", "trunk", "foo"))
self.assertEqual(e.kind, NODE_FILE)
self.assertEqual(e.revision, 0)
def test_get_config(self):
self.assertIsInstance(client.get_config(), client.Config)
try:
base_dir = tempfile.mkdtemp()
base_dir_basename = os.path.basename(base_dir)
svn_cfg_dir = os.path.join(base_dir, '.subversion')
os.mkdir(svn_cfg_dir)
with open(os.path.join(svn_cfg_dir, 'config'), 'w') as svn_cfg:
svn_cfg.write('[miscellany]\n')
svn_cfg.write('global-ignores = %s' % base_dir_basename)
config = client.get_config(svn_cfg_dir)
self.assertIsInstance(config, client.Config)
ignores = config.get_default_ignores()
self.assertTrue(base_dir_basename in ignores)
finally:
shutil.rmtree(base_dir)
def test_diff(self):
r = ra.RemoteAccess(self.repos_url,
auth=ra.Auth([ra.get_username_provider()]))
dc = self.get_commit_editor(self.repos_url)
f = dc.add_file("foo")
- f.modify("foo1")
+ f.modify(b"foo1")
dc.close()
dc = self.get_commit_editor(self.repos_url)
f = dc.open_file("foo")
- f.modify("foo2")
+ f.modify(b"foo2")
dc.close()
if client.api_version() < (1, 5):
self.assertRaises(NotImplementedError, self.client.diff, 1, 2,
self.repos_url, self.repos_url)
return # Skip test
(outf, errf) = self.client.diff(1, 2, self.repos_url, self.repos_url)
self.addCleanup(outf.close)
self.addCleanup(errf.close)
self.assertEqual(b"""Index: foo
===================================================================
--- foo\t(revision 1)
+++ foo\t(revision 2)
@@ -1 +1 @@
-foo1
\\ No newline at end of file
+foo2
\\ No newline at end of file
""".splitlines(), outf.read().splitlines())
self.assertEqual(b"", errf.read())
def assertCatEquals(self, value, revision=None):
- io = StringIO()
+ io = BytesIO()
self.client.cat("dc/foo", io, revision)
self.assertEqual(value, io.getvalue())
def test_cat(self):
self.build_tree({"dc/foo": "bla"})
self.client.add("dc/foo")
self.client.log_msg_func = lambda c: "Commit"
self.client.commit(["dc"])
self.assertCatEquals(b"bla")
self.build_tree({"dc/foo": "blabla"})
self.client.commit(["dc"])
self.assertCatEquals(b"blabla")
self.assertCatEquals(b"bla", revision=1)
self.assertCatEquals(b"blabla", revision=2)
def assertLogEntryChangedPathsEquals(self, expected, entry):
changed_paths = entry["changed_paths"]
self.assertIsInstance(changed_paths, dict)
self.assertEqual(sorted(expected), sorted(changed_paths.keys()))
def assertLogEntryMessageEquals(self, expected, entry):
self.assertEqual(expected, entry["revprops"]["svn:log"])
def assertLogEntryDateAlmostEquals(self, expected, entry, delta):
actual = datetime.strptime(entry["revprops"]["svn:date"], "%Y-%m-%dT%H:%M:%S.%fZ")
self.assertTrue((actual - expected) < delta)
def test_log(self):
log_entries = []
commit_msg_1 = "Commit"
commit_msg_2 = "Commit 2"
delta = timedelta(hours=1)
def cb(changed_paths, revision, revprops, has_children=False):
log_entries.append({
'changed_paths': changed_paths,
'revision': revision,
'revprops': revprops,
'has_children': has_children,
})
self.build_tree({"dc/foo": "bla"})
self.client.add("dc/foo")
self.client.log_msg_func = lambda c: commit_msg_1
self.client.commit(["dc"])
commit_1_dt = datetime.utcnow()
self.client.log(cb, "dc/foo")
self.assertEqual(1, len(log_entries))
self.assertEqual(None, log_entries[0]["changed_paths"])
self.assertEqual(1, log_entries[0]["revision"])
self.assertLogEntryMessageEquals(commit_msg_1, log_entries[0])
self.assertLogEntryDateAlmostEquals(commit_1_dt, log_entries[0], delta)
self.build_tree({
"dc/foo": "blabla",
"dc/bar": "blablabla",
})
self.client.add("dc/bar")
self.client.log_msg_func = lambda c: commit_msg_2
self.client.commit(["dc"])
commit_2_dt = datetime.utcnow()
log_entries = []
self.client.log(cb, "dc/foo", discover_changed_paths=True)
self.assertEqual(2, len(log_entries))
self.assertLogEntryChangedPathsEquals(["/foo", "/bar"], log_entries[0])
self.assertEqual(2, log_entries[0]["revision"])
self.assertLogEntryMessageEquals(commit_msg_2, log_entries[0])
self.assertLogEntryDateAlmostEquals(commit_2_dt, log_entries[0], delta)
self.assertLogEntryChangedPathsEquals(["/foo"], log_entries[1])
self.assertEqual(1, log_entries[1]["revision"])
self.assertLogEntryMessageEquals(commit_msg_1, log_entries[1])
self.assertLogEntryDateAlmostEquals(commit_1_dt, log_entries[1], delta)
log_entries = []
self.client.log(cb, "dc/foo", start_rev=2, end_rev=2, discover_changed_paths=True)
self.assertEqual(1, len(log_entries))
self.assertLogEntryChangedPathsEquals(["/foo", "/bar"], log_entries[0])
self.assertEqual(2, log_entries[0]["revision"])
self.assertLogEntryMessageEquals(commit_msg_2, log_entries[0])
self.assertLogEntryDateAlmostEquals(commit_2_dt, log_entries[0], delta)
def test_info(self):
self.build_tree({"dc/foo": "bla"})
self.client.add("dc/foo")
self.client.log_msg_func = lambda c: "Commit"
self.client.commit(["dc"])
info = self.client.info("dc/foo")
self.assertEqual(["foo"], info.keys())
self.assertEqual(1, info["foo"].revision)
self.assertEqual(3, info["foo"].size)
if client.api_version() < (1, 7):
# TODO: Why is this failing on 1.7?
self.assertEqual(wc.SCHEDULE_NORMAL, info["foo"].wc_info.schedule)
self.build_tree({"dc/bar": "blablabla"})
self.client.add(os.path.abspath("dc/bar"))
def test_info_nonexistant(self):
self.build_tree({"dc/foo": "bla"})
self.client.add("dc/foo")
self.client.log_msg_func = lambda c: "Commit"
self.client.commit(["dc"])
self.assertRaises(SubversionException, self.client.info, "dc/missing")
diff --git a/subvertpy/tests/test_delta.py b/subvertpy/tests/test_delta.py
index 7d392048..3bfe88e6 100644
--- a/subvertpy/tests/test_delta.py
+++ b/subvertpy/tests/test_delta.py
@@ -1,80 +1,80 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for subvertpy.delta."""
from io import BytesIO
from subvertpy.delta import (
decode_length,
encode_length,
pack_svndiff0,
send_stream,
unpack_svndiff0,
apply_txdelta_handler,
TXDELTA_NEW, TXDELTA_SOURCE, TXDELTA_TARGET,
)
from subvertpy.tests import TestCase
class DeltaTests(TestCase):
def setUp(self):
super(DeltaTests, self).setUp()
self.windows = []
def storing_window_handler(self, window):
self.windows.append(window)
def test_send_stream(self):
- stream = BytesIO("foo")
+ stream = BytesIO(b"foo")
send_stream(stream, self.storing_window_handler)
self.assertEqual([(0, 0, 3, 0, [(2, 0, 3)], b'foo'), None],
self.windows)
def test_apply_delta(self):
stream = BytesIO()
source = b"(source)"
handler = apply_txdelta_handler(source, stream)
new = b"(new)"
ops = ( # (action, offset, length)
(TXDELTA_NEW, 0, len(new)),
(TXDELTA_SOURCE, 0, len(source)),
(TXDELTA_TARGET, len(new), len("(s")), # Copy "(s"
(TXDELTA_TARGET, len("(n"), len("ew)")), # Copy "ew)"
# Copy as target is generated
(TXDELTA_TARGET, len(new + source), len("(sew)") * 2),
)
result = "(new)(source)(sew)(sew)(sew)"
# (source offset, source length, result length, src_ops, ops, new)
handler((0, len(source), len(result), 0, ops, new))
handler(None)
self.assertEqual(result, stream.getvalue())
class MarshallTests(TestCase):
def test_encode_length(self):
self.assertEqual("\x81\x02", encode_length(130))
def test_roundtrip_length(self):
self.assertEqual((42, ""), decode_length(encode_length(42)))
def test_roundtrip_window(self):
mywindow = (0, 0, 3, 1, [(2, 0, 3)], 'foo')
self.assertEqual([mywindow], list(unpack_svndiff0(pack_svndiff0([mywindow]))))
diff --git a/subvertpy/tests/test_ra.py b/subvertpy/tests/test_ra.py
index 25ba9bb0..451f633a 100644
--- a/subvertpy/tests/test_ra.py
+++ b/subvertpy/tests/test_ra.py
@@ -1,465 +1,465 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Subversion ra library tests."""
-from cStringIO import StringIO
+from io import BytesIO
from subvertpy import (
NODE_DIR, NODE_NONE, NODE_UNKNOWN,
SubversionException,
ra,
)
from subvertpy.tests import (
SubversionTestCase,
TestCase,
)
class VersionTest(TestCase):
def test_version_length(self):
self.assertEqual(4, len(ra.version()))
def test_api_version_length(self):
self.assertEqual(4, len(ra.api_version()))
def test_api_version_later_same(self):
self.assertTrue(ra.api_version() <= ra.version())
class TestRemoteAccessUnknown(TestCase):
def test_unknown_url(self):
self.assertRaises(SubversionException, ra.RemoteAccess, "bla://")
class TestRemoteAccess(SubversionTestCase):
def setUp(self):
super(TestRemoteAccess, self).setUp()
self.repos_url = self.make_repository("d")
self.ra = ra.RemoteAccess(self.repos_url,
auth=ra.Auth([ra.get_username_provider()]))
def tearDown(self):
del self.ra
super(TestRemoteAccess, self).tearDown()
def commit_editor(self):
return self.get_commit_editor(self.repos_url)
def do_commit(self):
dc = self.get_commit_editor(self.repos_url)
dc.add_dir("foo")
dc.close()
def test_repr(self):
self.assertEqual("RemoteAccess(\"%s\")" % self.repos_url,
repr(self.ra))
def test_latest_revnum(self):
self.assertEqual(0, self.ra.get_latest_revnum())
def test_latest_revnum_one(self):
self.do_commit()
self.assertEqual(1, self.ra.get_latest_revnum())
def test_get_uuid(self):
self.assertIsInstance(self.ra.get_uuid(), str)
def test_get_repos_root(self):
self.assertEqual(self.repos_url, self.ra.get_repos_root())
def test_get_url(self):
if ra.api_version() < (1, 5):
self.assertRaises(NotImplementedError, self.ra.get_url)
else:
self.assertEqual(self.repos_url, self.ra.get_url())
def test_reparent(self):
self.ra.reparent(self.repos_url)
def test_has_capability(self):
if ra.api_version() < (1, 5):
self.assertRaises(NotImplementedError, self.ra.has_capability, "FOO")
else:
self.assertRaises(SubversionException, self.ra.has_capability, "FOO")
def test_get_dir(self):
ret = self.ra.get_dir("", 0)
self.assertIsInstance(ret, tuple)
def test_get_dir_leading_slash(self):
ret = self.ra.get_dir("/", 0)
self.assertIsInstance(ret, tuple)
def test_get_dir_kind(self):
self.do_commit()
(dirents, fetch_rev, props) = self.ra.get_dir("/", 1, fields=ra.DIRENT_KIND)
self.assertIsInstance(props, dict)
self.assertEqual(1, fetch_rev)
self.assertEqual(NODE_DIR, dirents["foo"]["kind"])
def test_change_rev_prop(self):
self.do_commit()
self.ra.change_rev_prop(1, "foo", "bar")
def test_rev_proplist(self):
self.assertIsInstance(self.ra.rev_proplist(0), dict)
def test_do_diff(self):
self.do_commit()
class MyFileEditor:
def change_prop(self, name, val): pass
def close(self, checksum=None): pass
class MyDirEditor:
def change_prop(self, name, val): pass
def add_directory(self, *args): return MyDirEditor()
def add_file(self, *args): return MyFileEditor()
def close(self): pass
class MyEditor:
def set_target_revision(self, rev): pass
def open_root(self, base_rev):
return MyDirEditor()
def close(self): pass
reporter = self.ra.do_diff(1, "", self.ra.get_repos_root(), MyEditor())
reporter.set_path("", 0, True)
reporter.finish()
self.assertRaises(RuntimeError, reporter.finish)
self.assertRaises(RuntimeError, reporter.set_path, "", 0, True)
def test_iter_log_invalid(self):
self.assertRaises(SubversionException, list, self.ra.iter_log(
["idontexist"], 0, 0, revprops=["svn:date", "svn:author", "svn:log"]))
self.assertRaises(SubversionException, list, self.ra.iter_log(
[""], 0, 1000, revprops=["svn:date", "svn:author", "svn:log"]))
def test_iter_log(self):
def check_results(returned):
self.assertEqual(2, len(returned))
self.assertTrue(len(returned[0]) in (3,4))
if len(returned[0]) == 3:
(paths, revnum, props) = returned[0]
else:
(paths, revnum, props, has_children) = returned[0]
self.assertEqual(None, paths)
self.assertEqual(revnum, 0)
self.assertEqual(["svn:date"], props.keys())
if len(returned[1]) == 3:
(paths, revnum, props) = returned[1]
else:
(paths, revnum, props, has_children) = returned[1]
if ra.api_version() < (1, 6):
self.assertEqual({'/foo': ('A', None, -1, NODE_UNKNOWN)}, paths)
else:
self.assertEqual({'/foo': ('A', None, -1, NODE_DIR)}, paths)
self.assertEqual(revnum, 1)
self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
set(props.keys()))
returned = list(self.ra.iter_log([""], 0, 0,
revprops=["svn:date", "svn:author", "svn:log"]))
self.assertEqual(1, len(returned))
self.do_commit()
returned = list(self.ra.iter_log(None, 0, 1, discover_changed_paths=True,
strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"]))
check_results(returned)
def test_get_log(self):
returned = []
def cb(*args):
returned.append(args)
def check_results(returned):
self.assertEqual(2, len(returned))
self.assertTrue(len(returned[0]) in (3,4))
if len(returned[0]) == 3:
(paths, revnum, props) = returned[0]
else:
(paths, revnum, props, has_children) = returned[0]
self.assertEqual(None, paths)
self.assertEqual(revnum, 0)
self.assertEqual(["svn:date"], props.keys())
if len(returned[1]) == 3:
(paths, revnum, props) = returned[1]
else:
(paths, revnum, props, has_children) = returned[1]
self.assertEqual({'/foo': ('A', None, -1)}, paths)
self.assertEqual(revnum, 1)
self.assertEqual(set(["svn:date", "svn:author", "svn:log"]),
set(props.keys()))
self.ra.get_log(cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
self.assertEqual(1, len(returned))
self.do_commit()
returned = []
self.ra.get_log(cb, None, 0, 1, discover_changed_paths=True,
strict_node_history=False, revprops=["svn:date", "svn:author", "svn:log"])
check_results(returned)
def test_get_log_cancel(self):
def cb(*args):
raise KeyError
self.do_commit()
self.assertRaises(KeyError,
self.ra.get_log, cb, [""], 0, 0, revprops=["svn:date", "svn:author", "svn:log"])
def test_get_commit_editor_double_close(self):
def mycb(*args):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
dir = editor.open_root()
dir.close()
self.assertRaises(RuntimeError, dir.close)
editor.close()
self.assertRaises(RuntimeError, editor.close)
self.assertRaises(RuntimeError, editor.abort)
def test_get_commit_editor_busy(self):
def mycb(rev):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
self.assertRaises(ra.BusyException, self.ra.get_commit_editor,
{"svn:log": "foo"}, mycb)
editor.abort()
def test_get_commit_editor_double_open(self):
def mycb(rev):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
root = editor.open_root()
root.add_directory("somedir")
self.assertRaises(RuntimeError, root.add_directory, "foo")
def test_get_commit_editor_custom_revprops(self):
if ra.version()[:2] < (1,5):
return
def mycb(paths, rev, revprops):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo", "bar:foo": "bla", "svn:custom:blie": "bloe"}, mycb)
root = editor.open_root()
root.add_directory("somedir").close()
root.close()
editor.close()
self.assertEqual(set(['bar:foo', 'svn:author', 'svn:custom:blie', 'svn:date', 'svn:log']), set(self.ra.rev_proplist(1).keys()))
def test_get_commit_editor_context_manager(self):
def mycb(paths, rev, revprops):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
self.assertIs(editor, editor.__enter__())
dir = editor.open_root(0)
subdir = dir.add_directory("foo")
self.assertIs(subdir, subdir.__enter__())
subdir.__exit__(None, None, None)
dir.__exit__(None, None, None)
editor.__exit__(None, None, None)
def test_get_commit_editor(self):
def mycb(paths, rev, revprops):
pass
editor = self.ra.get_commit_editor({"svn:log": "foo"}, mycb)
dir = editor.open_root(0)
subdir = dir.add_directory("foo")
subdir.close()
dir.close()
editor.close()
def test_commit_file_props(self):
cb = self.commit_editor()
f = cb.add_file("bar")
f.modify("a")
f.change_prop("bla:bar", "blie")
cb.close()
cb = self.commit_editor()
f = cb.open_file("bar")
f.change_prop("bla:bar", None)
cb.close()
- stream = StringIO()
+ stream = BytesIO()
props = self.ra.get_file("bar", stream, 1)[1]
self.assertEqual("blie", props.get("bla:bar"))
- stream = StringIO()
+ stream = BytesIO()
props = self.ra.get_file("bar", stream, 2)[1]
self.assertIs(None, props.get("bla:bar"))
def test_get_file_revs(self):
cb = self.commit_editor()
cb.add_file("bar").modify("a")
cb.close()
cb = self.commit_editor()
f = cb.open_file("bar")
f.modify("b")
f.change_prop("bla", "bloe")
cb.close()
rets = []
def handle(path, rev, props, from_merge=None):
rets.append((path, rev, props))
self.ra.get_file_revs("bar", 1, 2, handle)
self.assertEqual(2, len(rets))
self.assertEqual(1, rets[0][1])
self.assertEqual(2, rets[1][1])
self.assertEqual("/bar", rets[0][0])
self.assertEqual("/bar", rets[1][0])
def test_get_file(self):
cb = self.commit_editor()
cb.add_file("bar").modify("a")
cb.close()
- stream = StringIO()
+ stream = BytesIO()
self.ra.get_file("bar", stream, 1)
stream.seek(0)
self.assertEqual(b"a", stream.read())
- stream = StringIO()
+ stream = BytesIO()
self.ra.get_file("/bar", stream, 1)
stream.seek(0)
self.assertEqual(b"a", stream.read())
def test_get_locations_root(self):
self.assertEqual({0: "/"}, self.ra.get_locations("", 0, [0]))
def test_check_path(self):
cb = self.commit_editor()
cb.add_dir("bar")
cb.close()
self.assertEqual(NODE_DIR, self.ra.check_path("bar", 1))
self.assertEqual(NODE_DIR, self.ra.check_path("bar/", 1))
self.assertEqual(NODE_NONE, self.ra.check_path("blaaaa", 1))
def test_check_path_with_unsafe_path(self):
# The SVN code asserts that paths do not have a leading '/'... And if
# that assertion fires, it calls exit(1). That sucks. Make sure it
# doesn't happen.
self.assertRaises(ValueError, self.ra.check_path, "/bar", 1)
self.assertRaises(ValueError, self.ra.check_path, "///bar", 1)
def test_stat(self):
cb = self.commit_editor()
cb.add_dir("bar")
cb.close()
ret = self.ra.stat("bar", 1)
self.assertEqual(set(['last_author', 'kind', 'created_rev', 'has_props', 'time', 'size']), set(ret.keys()))
def test_get_locations_dir(self):
cb = self.commit_editor()
cb.add_dir("bar")
cb.close()
cb = self.commit_editor()
cb.add_dir("bla", "bar", 1)
cb.close()
cb = self.commit_editor()
cb.delete("bar")
cb.close()
self.assertEqual({1: "/bar", 2: "/bla"},
self.ra.get_locations("bla", 2, [1,2]))
self.assertEqual({1: "/bar", 2: "/bar"},
self.ra.get_locations("bar", 1, [1,2]))
self.assertEqual({1: "/bar", 2: "/bar"},
self.ra.get_locations("bar", 2, [1,2]))
self.assertEqual({1: "/bar", 2: "/bla", 3: "/bla"},
self.ra.get_locations("bla", 3, [1,2,3]))
def test_get_locations_dir_with_unsafe_path(self):
# Make sure that an invalid path won't trip an assertion error
self.assertRaises(ValueError, self.ra.get_locations, "//bla", 2, [1,2])
class AuthTests(TestCase):
def test_not_list(self):
self.assertRaises(TypeError, ra.Auth, ra.get_simple_provider())
def test_not_registered(self):
auth = ra.Auth([])
self.assertRaises(SubversionException, auth.credentials, "svn.simple", "MyRealm")
def test_simple(self):
auth = ra.Auth([ra.get_simple_prompt_provider(lambda realm, uname, may_save: ("foo", "geheim", False), 0)])
creds = auth.credentials("svn.simple", "MyRealm")
self.assertEqual(("foo", "geheim", 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_username(self):
auth = ra.Auth([ra.get_username_prompt_provider(lambda realm, may_save: ("somebody", False), 0)])
creds = auth.credentials("svn.username", "MyRealm")
self.assertEqual(("somebody", 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_client_cert(self):
auth = ra.Auth([ra.get_ssl_client_cert_prompt_provider(lambda realm, may_save: ("filename", False), 0)])
creds = auth.credentials("svn.ssl.client-cert", "MyRealm")
self.assertEqual(("filename", False), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_client_cert_pw(self):
auth = ra.Auth([ra.get_ssl_client_cert_pw_prompt_provider(lambda realm, may_save: ("supergeheim", False), 0)])
creds = auth.credentials("svn.ssl.client-passphrase", "MyRealm")
self.assertEqual(("supergeheim", False), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_server_trust(self):
auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: (42, False))])
auth.set_parameter("svn:auth:ssl:failures", 23)
creds = auth.credentials("svn.ssl.server", "MyRealm")
self.assertEqual((42, 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_server_untrust(self):
auth = ra.Auth([ra.get_ssl_server_trust_prompt_provider(lambda realm, failures, certinfo, may_save: None)])
auth.set_parameter("svn:auth:ssl:failures", 23)
creds = auth.credentials("svn.ssl.server", "MyRealm")
self.assertRaises(StopIteration, next, creds)
def test_retry(self):
self.i = 0
def inc_foo(realm, may_save):
self.i += 1
return ("somebody%d" % self.i, False)
auth = ra.Auth([ra.get_username_prompt_provider(inc_foo, 2)])
creds = auth.credentials("svn.username", "MyRealm")
self.assertEqual(("somebody1", 0), next(creds))
self.assertEqual(("somebody2", 0), next(creds))
self.assertEqual(("somebody3", 0), next(creds))
self.assertRaises(StopIteration, next, creds)
def test_set_default_username(self):
a = ra.Auth([])
a.set_parameter("svn:auth:username", "foo")
self.assertEqual("foo", a.get_parameter("svn:auth:username"))
def test_set_default_password(self):
a = ra.Auth([])
a.set_parameter("svn:auth:password", "bar")
self.assertEqual("bar", a.get_parameter("svn:auth:password"))
def test_platform_auth_providers(self):
ra.Auth(ra.get_platform_specific_client_providers())
diff --git a/subvertpy/tests/test_repos.py b/subvertpy/tests/test_repos.py
index fb8fdebf..cfb30532 100644
--- a/subvertpy/tests/test_repos.py
+++ b/subvertpy/tests/test_repos.py
@@ -1,148 +1,148 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Subversion repository library tests."""
-from cStringIO import StringIO
+from io import BytesIO
import os
import textwrap
from subvertpy import repos, SubversionException
from subvertpy.tests import TestCaseInTempDir, TestCase
class VersionTest(TestCase):
def test_version_length(self):
self.assertEqual(4, len(repos.version()))
def test_api_version_length(self):
self.assertEqual(4, len(repos.api_version()))
def test_api_version_later_same(self):
self.assertTrue(repos.api_version() <= repos.version())
class TestRepository(TestCaseInTempDir):
def setUp(self):
super(TestRepository, self).setUp()
def test_create(self):
repos.create(os.path.join(self.test_dir, "foo"))
def test_capability(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
if repos.api_version() < (1, 5):
self.assertRaises(NotImplementedError, r.has_capability, "mergeinfo")
else:
self.assertIsInstance(r.has_capability("mergeinfo"), bool)
def test_verify_fs(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
- f = StringIO()
+ f = BytesIO()
r.verify_fs(f, 0, 0)
self.assertEqual(b'* Verified revision 0.\n', f.getvalue())
def test_open(self):
repos.create(os.path.join(self.test_dir, "foo"))
repos.Repository("foo")
def test_uuid(self):
repos.create(os.path.join(self.test_dir, "foo"))
self.assertIsInstance(repos.Repository("foo").fs().get_uuid(), str)
def test_youngest_rev(self):
repos.create(os.path.join(self.test_dir, "foo"))
self.assertEqual(0, repos.Repository("foo").fs().youngest_revision())
def test_rev_root(self):
repos.create(os.path.join(self.test_dir, "foo"))
self.assertTrue(repos.Repository("foo").fs().revision_root(0) is not None)
def test_load_fs_invalid(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
- dumpfile = "Malformed"
- feedback = StringIO()
- self.assertRaises(SubversionException, r.load_fs, StringIO(dumpfile),
+ dumpfile = b"Malformed"
+ feedback = BytesIO()
+ self.assertRaises(SubversionException, r.load_fs, BytesIO(dumpfile),
feedback, repos.LOAD_UUID_DEFAULT)
def test_load_fs(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
dumpfile = textwrap.dedent("""\
SVN-fs-dump-format-version: 2
UUID: 38f0a982-fd1f-4e00-aa6b-a20720f4b9ca
Revision-number: 0
Prop-content-length: 56
Content-length: 56
K 8
svn:date
V 27
2011-08-26T13:08:30.187858Z
PROPS-END
- """)
- feedback = StringIO()
- r.load_fs(StringIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT)
+ """).encode("ascii")
+ feedback = BytesIO()
+ r.load_fs(BytesIO(dumpfile), feedback, repos.LOAD_UUID_DEFAULT)
self.assertEqual(r.fs().get_uuid(), "38f0a982-fd1f-4e00-aa6b-a20720f4b9ca")
def test_rev_props(self):
repos.create(os.path.join(self.test_dir, "foo"))
self.assertEqual(["svn:date"], repos.Repository("foo").fs().revision_proplist(0).keys())
def test_rev_root_invalid(self):
repos.create(os.path.join(self.test_dir, "foo"))
self.assertRaises(SubversionException, repos.Repository("foo").fs().revision_root, 1)
def test_pack_fs(self):
r = repos.create(os.path.join(self.test_dir, "foo"))
r.pack_fs()
def test_paths_changed(self):
repos.create(os.path.join(self.test_dir, "foo"))
root = repos.Repository("foo").fs().revision_root(0)
self.assertEqual({}, root.paths_changed())
def test_is_dir(self):
repos.create(os.path.join(self.test_dir, "foo"))
root = repos.Repository("foo").fs().revision_root(0)
self.assertEqual(True, root.is_dir(""))
self.assertEqual(False, root.is_dir("nonexistant"))
def test_is_file(self):
repos.create(os.path.join(self.test_dir, "foo"))
root = repos.Repository("foo").fs().revision_root(0)
self.assertEqual(False, root.is_file(""))
self.assertEqual(False, root.is_file("nonexistant"))
class StreamTests(TestCase):
def test_read(self):
s = repos.Stream()
if repos.api_version() < (1, 6):
self.assertRaises(NotImplementedError, s.read)
else:
self.assertEqual(b"", s.read())
self.assertEqual(b"", s.read(15))
s.close()
def test_write(self):
s = repos.Stream()
self.assertEqual(0, s.write(""))
self.assertEqual(2, s.write("ab"))
s.close()
diff --git a/subvertpy/tests/test_wc.py b/subvertpy/tests/test_wc.py
index 12b73d73..81f5bd0c 100644
--- a/subvertpy/tests/test_wc.py
+++ b/subvertpy/tests/test_wc.py
@@ -1,307 +1,307 @@
# Copyright (C) 2005-2007 Jelmer Vernooij <jelmer@samba.org>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Subversion ra library tests."""
-from StringIO import StringIO
+from io import BytesIO
import os
import subvertpy
from subvertpy import (
NODE_FILE,
wc,
)
from subvertpy.tests import (
SubversionTestCase,
TestCase,
SkipTest,
)
class VersionTest(TestCase):
def test_version_length(self):
self.assertEqual(4, len(wc.version()))
def test_api_version_length(self):
self.assertEqual(4, len(wc.api_version()))
def test_api_version_later_same(self):
self.assertTrue(wc.api_version() <= wc.version())
class WorkingCopyTests(TestCase):
def test_get_adm_dir(self):
self.assertEqual(".svn", wc.get_adm_dir())
def test_set_adm_dir(self):
old_dir_name = wc.get_adm_dir()
try:
wc.set_adm_dir("_svn")
self.assertEqual("_svn", wc.get_adm_dir())
finally:
wc.set_adm_dir(old_dir_name)
def test_is_normal_prop(self):
self.assertTrue(wc.is_normal_prop("svn:ignore"))
def test_is_entry_prop(self):
self.assertTrue(wc.is_entry_prop("svn:entry:foo"))
def test_is_wc_prop(self):
self.assertTrue(wc.is_wc_prop("svn:wc:foo"))
def test_match_ignore_list(self):
if wc.api_version() < (1, 5):
self.assertRaises(NotImplementedError, wc.match_ignore_list, "foo", [])
return # Skip test
self.assertTrue(wc.match_ignore_list("foo", [ "f*"]))
self.assertTrue(wc.match_ignore_list("foo", ["foo"]))
self.assertFalse(wc.match_ignore_list("foo", []))
self.assertFalse(wc.match_ignore_list("foo", ["bar"]))
class WcTests(SubversionTestCase):
def test_revision_status(self):
repos_url = self.make_client("repos", "checkout")
ret = wc.revision_status("checkout")
self.assertEqual((0, 0, 0, 0), ret)
def test_revision_status_trailing(self):
repos_url = self.make_client("repos", "checkout")
ret = wc.revision_status("checkout/")
self.assertEqual((0, 0, 0, 0), ret)
class AdmTests(SubversionTestCase):
def setUp(self):
super(AdmTests, self).setUp()
if getattr(wc, "WorkingCopy", None) is None:
raise SkipTest(
"Subversion 1.7 API for WorkingCopy not yet supported")
def test_has_binary_prop(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "\x00\x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
path = os.path.join(self.test_dir, "checkout/bar")
self.assertFalse(adm.has_binary_prop(path))
adm.close()
def test_get_ancestry(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "\x00\x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
path = os.path.join(self.test_dir, "checkout/bar")
self.assertEqual(("%s/bar" % repos_url, 0), adm.get_ancestry("checkout/bar"))
adm.close()
def test_maybe_set_repos_root(self):
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout")
adm.maybe_set_repos_root(os.path.join(self.test_dir, "checkout"), repos_url)
adm.close()
def test_add_repos_file(self):
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
- adm.add_repos_file("checkout/bar", StringIO("basecontents"), StringIO("contents"), {}, {})
+ adm.add_repos_file("checkout/bar", BytesIO(b"basecontents"), BytesIO(b"contents"), {}, {})
self.assertEqual(b"basecontents", wc.get_pristine_contents("checkout/bar").read())
def test_mark_missing_deleted(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "\x00\x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
os.remove("checkout/bar")
adm.mark_missing_deleted("checkout/bar")
self.assertFalse(os.path.exists("checkout/bar"))
def test_remove_from_revision_control(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "\x00\x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
adm.remove_from_revision_control("bar")
self.assertTrue(os.path.exists("checkout/bar"))
def test_relocate(self):
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
adm.relocate("checkout", "file://", "http://")
def test_translated_stream(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "My id: $Id$"})
self.client_add('checkout/bar')
self.client_set_prop("checkout/bar", "svn:keywords", "Id\n")
self.client_commit("checkout", "foo")
adm = wc.WorkingCopy(None, "checkout", True)
path = os.path.join(self.test_dir, "checkout/bar")
stream = adm.translated_stream(path, path, wc.TRANSLATE_TO_NF)
self.assertTrue(stream.read().startswith(b"My id: $Id: "))
def test_text_modified(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "My id: $Id$"})
self.client_add('checkout/bar')
self.client_set_prop("checkout/bar", "svn:keywords", "Id\n")
self.client_commit("checkout", "foo")
adm = wc.WorkingCopy(None, "checkout")
self.assertFalse(adm.text_modified("checkout/bar"))
self.build_tree({"checkout/bar": "gambon"})
self.assertTrue(adm.text_modified("checkout/bar", True))
def test_props_modified(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "My id: $Id$"})
self.client_add('checkout/bar')
self.client_set_prop("checkout/bar", "svn:keywords", "Id\n")
self.client_commit("checkout", "foo")
adm = wc.WorkingCopy(None, "checkout", True)
self.assertFalse(adm.props_modified("checkout/bar"))
adm.prop_set("aprop", "avalue", "checkout/bar")
self.assertTrue(adm.props_modified("checkout/bar"))
def test_prop_set(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "file"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
adm.prop_set("aprop", "avalue", "checkout/bar")
self.assertEqual(adm.prop_get("aprop", "checkout/bar"), "avalue")
adm.prop_set("aprop", None, "checkout/bar")
self.assertEqual(adm.prop_get("aprop", "checkout/bar"), None)
def test_committed_queue(self):
if getattr(wc, "CommittedQueue", None) is None:
raise SkipTest("CommittedQueue not available")
cq = wc.CommittedQueue()
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout", True)
adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer")
def test_entry_not_found(self):
repos_url = self.make_client("repos", "checkout")
adm = wc.WorkingCopy(None, "checkout")
self.assertRaises(KeyError, adm.entry, "bar")
def test_entry(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "\x00\x01"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
entry = adm.entry("checkout/bar")
self.assertEqual("bar", entry.name)
self.assertEqual(NODE_FILE, entry.kind)
self.assertEqual(0, entry.revision)
self.client_commit("checkout", "msg")
adm = wc.WorkingCopy(None, "checkout")
entry = adm.entry("checkout/bar")
self.assertEqual("bar", entry.name)
self.assertEqual(NODE_FILE, entry.kind)
self.assertEqual(1, entry.revision)
def test_get_actual_target(self):
repos_url = self.make_client("repos", ".")
self.assertEqual((self.test_dir, "bla"),
wc.get_actual_target("%s/bla" % self.test_dir))
def test_is_wc_root(self):
repos_url = self.make_client("repos", ".")
self.build_tree({"bar": None})
self.client_add('bar')
adm = wc.WorkingCopy(None, ".")
self.assertTrue(adm.is_wc_root(self.test_dir))
self.assertFalse(adm.is_wc_root(os.path.join(self.test_dir, "bar")))
def test_status(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "text"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout")
self.assertEqual(wc.STATUS_ADDED, adm.status('bar').status)
self.client_commit("checkout", "foo")
adm = wc.WorkingCopy(None, "checkout")
self.assertEqual(wc.STATUS_NORMAL, adm.status('bar').status)
def test_transmit_text_deltas(self):
repos_url = self.make_client("repos", ".")
self.build_tree({"bar": "blala"})
self.client_add('bar')
adm = wc.WorkingCopy(None, ".", True)
class Editor(object):
"""Editor"""
def __init__(self):
self._windows = []
def apply_textdelta(self, checksum):
def window_handler(window):
self._windows.append(window)
return window_handler
def close(self):
pass
editor = Editor()
(tmpfile, digest) = adm.transmit_text_deltas("bar", True, editor)
self.assertEqual(editor._windows, [(0, 0, 5, 0, [(2, 0, 5)], b'blala'), None])
self.assertIsInstance(tmpfile, str)
self.assertEqual(16, len(digest))
bar = adm.entry("bar")
self.assertEqual(-1, bar.cmt_rev)
self.assertEqual(0, bar.revision)
cq = wc.CommittedQueue()
cq.queue("bar", adm)
adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer")
bar = adm.entry("bar")
self.assertEqual("bar", bar.name)
self.assertEqual(NODE_FILE, bar.kind)
self.assertEqual(wc.SCHEDULE_NORMAL, bar.schedule)
self.assertIs(None, bar.checksum)
self.assertEqual(1, bar.cmt_rev)
self.assertEqual(1, bar.revision)
def test_process_committed_queue(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "la"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
cq = wc.CommittedQueue()
cq.queue(os.path.join(self.test_dir, "checkout/bar"), adm)
adm.process_committed_queue(cq, 1, "2010-05-31T08:49:22.430000Z", "jelmer")
bar = adm.entry("checkout/bar")
self.assertEqual("bar", bar.name)
self.assertEqual(NODE_FILE, bar.kind)
self.assertEqual(wc.SCHEDULE_ADD, bar.schedule)
def test_probe_try(self):
repos_url = self.make_client("repos", "checkout")
self.build_tree({"checkout/bar": "la"})
self.client_add('checkout/bar')
adm = wc.WorkingCopy(None, "checkout", True)
try:
self.assertIs(None, adm.probe_try(self.test_dir))
except subvertpy.SubversionException as e:
(msg, num) = e.args
if num != subvertpy.ERR_WC_NOT_WORKING_COPY:
raise
self.assertEqual("checkout", adm.probe_try(os.path.join("checkout", "bar")).access_path())

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:45 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452564

Event Timeline