diff --git a/dulwich/file.py b/dulwich/file.py
index 96275a8c..c364d429 100644
--- a/dulwich/file.py
+++ b/dulwich/file.py
@@ -1,191 +1,192 @@
# file.py -- Safe access to git files
# Copyright (C) 2010 Google, Inc.
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Safe access to git files."""
import errno
import io
import os
import sys
import tempfile
def ensure_dir_exists(dirname):
"""Ensure a directory exists, creating if necessary."""
try:
os.makedirs(dirname)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def _fancy_rename(oldname, newname):
"""Rename file with temporary backup file to rollback if rename fails"""
if not os.path.exists(newname):
try:
os.rename(oldname, newname)
except OSError:
raise
return
# destination file exists
try:
(fd, tmpfile) = tempfile.mkstemp(".tmp", prefix=oldname+".", dir=".")
os.close(fd)
os.remove(tmpfile)
except OSError:
# either file could not be created (e.g. permission problem)
# or could not be deleted (e.g. rude virus scanner)
raise
try:
os.rename(newname, tmpfile)
except OSError:
raise # no rename occurred
try:
os.rename(oldname, newname)
except OSError:
os.rename(tmpfile, newname)
raise
os.remove(tmpfile)
def GitFile(filename, mode='rb', bufsize=-1):
"""Create a file object that obeys the git file locking protocol.
:return: a builtin file object or a _GitFile object
:note: See _GitFile for a description of the file locking protocol.
Only read-only and write-only (binary) modes are supported; r+, w+, and a
are not. To read and write from the same file, you can take advantage of
the fact that opening a file for write does not actually open the file you
request.
"""
if 'a' in mode:
raise IOError('append mode not supported for Git files')
if '+' in mode:
raise IOError('read/write mode not supported for Git files')
if 'b' not in mode:
raise IOError('text mode not supported for Git files')
if 'w' in mode:
return _GitFile(filename, mode, bufsize)
else:
return io.open(filename, mode, bufsize)
class FileLocked(Exception):
"""File is already locked."""
def __init__(self, filename, lockfilename):
self.filename = filename
self.lockfilename = lockfilename
super(FileLocked, self).__init__(filename, lockfilename)
class _GitFile(object):
"""File that follows the git locking protocol for writes.
All writes to a file foo will be written into foo.lock in the same
directory, and the lockfile will be renamed to overwrite the original file
on close.
:note: You *must* call close() or abort() on a _GitFile for the lock to be
released. Typically this will happen in a finally block.
"""
PROXY_PROPERTIES = set(['closed', 'encoding', 'errors', 'mode', 'name',
'newlines', 'softspace'])
PROXY_METHODS = ('__iter__', 'flush', 'fileno', 'isatty', 'read',
'readline', 'readlines', 'seek', 'tell',
'truncate', 'write', 'writelines')
def __init__(self, filename, mode, bufsize):
self._filename = filename
self._lockfilename = '%s.lock' % self._filename
try:
fd = os.open(
self._lockfilename,
- os.O_RDWR | os.O_CREAT | os.O_EXCL | getattr(os, "O_BINARY", 0))
+ os.O_RDWR | os.O_CREAT | os.O_EXCL |
+ getattr(os, "O_BINARY", 0))
except OSError as e:
if e.errno == errno.EEXIST:
raise FileLocked(filename, self._lockfilename)
raise
self._file = os.fdopen(fd, mode, bufsize)
self._closed = False
for method in self.PROXY_METHODS:
setattr(self, method, getattr(self._file, method))
def abort(self):
"""Close and discard the lockfile without overwriting the target.
If the file is already closed, this is a no-op.
"""
if self._closed:
return
self._file.close()
try:
os.remove(self._lockfilename)
self._closed = True
except OSError as e:
# The file may have been removed already, which is ok.
if e.errno != errno.ENOENT:
raise
self._closed = True
def close(self):
"""Close this file, saving the lockfile over the original.
:note: If this method fails, it will attempt to delete the lockfile.
However, it is not guaranteed to do so (e.g. if a filesystem
becomes suddenly read-only), which will prevent future writes to
this file until the lockfile is removed manually.
:raises OSError: if the original file could not be overwritten. The
lock file is still closed, so further attempts to write to the same
file object will raise ValueError.
"""
if self._closed:
return
os.fsync(self._file.fileno())
self._file.close()
try:
try:
os.rename(self._lockfilename, self._filename)
except OSError as e:
if sys.platform == 'win32' and e.errno == errno.EEXIST:
# Windows versions prior to Vista don't support atomic
# renames
_fancy_rename(self._lockfilename, self._filename)
else:
raise
finally:
self.abort()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __getattr__(self, name):
"""Proxy property calls to the underlying file."""
if name in self.PROXY_PROPERTIES:
return getattr(self._file, name)
raise AttributeError(name)
diff --git a/dulwich/tests/test_file.py b/dulwich/tests/test_file.py
index 9f937cc4..12df38a1 100644
--- a/dulwich/tests/test_file.py
+++ b/dulwich/tests/test_file.py
@@ -1,215 +1,214 @@
# test_file.py -- Test for git files
# Copyright (C) 2010 Google, Inc.
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
-import errno
import io
import os
import shutil
import sys
import tempfile
from dulwich.file import FileLocked, GitFile, _fancy_rename
from dulwich.tests import (
SkipTest,
TestCase,
)
class FancyRenameTests(TestCase):
def setUp(self):
super(FancyRenameTests, self).setUp()
self._tempdir = tempfile.mkdtemp()
self.foo = self.path('foo')
self.bar = self.path('bar')
self.create(self.foo, b'foo contents')
def tearDown(self):
shutil.rmtree(self._tempdir)
super(FancyRenameTests, self).tearDown()
def path(self, filename):
return os.path.join(self._tempdir, filename)
def create(self, path, contents):
f = open(path, 'wb')
f.write(contents)
f.close()
def test_no_dest_exists(self):
self.assertFalse(os.path.exists(self.bar))
_fancy_rename(self.foo, self.bar)
self.assertFalse(os.path.exists(self.foo))
new_f = open(self.bar, 'rb')
self.assertEqual(b'foo contents', new_f.read())
new_f.close()
def test_dest_exists(self):
self.create(self.bar, b'bar contents')
_fancy_rename(self.foo, self.bar)
self.assertFalse(os.path.exists(self.foo))
new_f = open(self.bar, 'rb')
self.assertEqual(b'foo contents', new_f.read())
new_f.close()
def test_dest_opened(self):
if sys.platform != "win32":
raise SkipTest("platform allows overwriting open files")
self.create(self.bar, b'bar contents')
dest_f = open(self.bar, 'rb')
self.assertRaises(OSError, _fancy_rename, self.foo, self.bar)
dest_f.close()
self.assertTrue(os.path.exists(self.path('foo')))
new_f = open(self.foo, 'rb')
self.assertEqual(b'foo contents', new_f.read())
new_f.close()
new_f = open(self.bar, 'rb')
self.assertEqual(b'bar contents', new_f.read())
new_f.close()
class GitFileTests(TestCase):
def setUp(self):
super(GitFileTests, self).setUp()
self._tempdir = tempfile.mkdtemp()
f = open(self.path('foo'), 'wb')
f.write(b'foo contents')
f.close()
def tearDown(self):
shutil.rmtree(self._tempdir)
super(GitFileTests, self).tearDown()
def path(self, filename):
return os.path.join(self._tempdir, filename)
def test_invalid(self):
foo = self.path('foo')
self.assertRaises(IOError, GitFile, foo, mode='r')
self.assertRaises(IOError, GitFile, foo, mode='ab')
self.assertRaises(IOError, GitFile, foo, mode='r+b')
self.assertRaises(IOError, GitFile, foo, mode='w+b')
self.assertRaises(IOError, GitFile, foo, mode='a+bU')
def test_readonly(self):
f = GitFile(self.path('foo'), 'rb')
self.assertTrue(isinstance(f, io.IOBase))
self.assertEqual(b'foo contents', f.read())
self.assertEqual(b'', f.read())
f.seek(4)
self.assertEqual(b'contents', f.read())
f.close()
def test_default_mode(self):
f = GitFile(self.path('foo'))
self.assertEqual(b'foo contents', f.read())
f.close()
def test_write(self):
foo = self.path('foo')
foo_lock = '%s.lock' % foo
orig_f = open(foo, 'rb')
self.assertEqual(orig_f.read(), b'foo contents')
orig_f.close()
self.assertFalse(os.path.exists(foo_lock))
f = GitFile(foo, 'wb')
self.assertFalse(f.closed)
self.assertRaises(AttributeError, getattr, f, 'not_a_file_property')
self.assertTrue(os.path.exists(foo_lock))
f.write(b'new stuff')
f.seek(4)
f.write(b'contents')
f.close()
self.assertFalse(os.path.exists(foo_lock))
new_f = open(foo, 'rb')
self.assertEqual(b'new contents', new_f.read())
new_f.close()
def test_open_twice(self):
foo = self.path('foo')
f1 = GitFile(foo, 'wb')
f1.write(b'new')
try:
f2 = GitFile(foo, 'wb')
self.fail()
except FileLocked as e:
pass
else:
f2.close()
f1.write(b' contents')
f1.close()
# Ensure trying to open twice doesn't affect original.
f = open(foo, 'rb')
self.assertEqual(b'new contents', f.read())
f.close()
def test_abort(self):
foo = self.path('foo')
foo_lock = '%s.lock' % foo
orig_f = open(foo, 'rb')
self.assertEqual(orig_f.read(), b'foo contents')
orig_f.close()
f = GitFile(foo, 'wb')
f.write(b'new contents')
f.abort()
self.assertTrue(f.closed)
self.assertFalse(os.path.exists(foo_lock))
new_orig_f = open(foo, 'rb')
self.assertEqual(new_orig_f.read(), b'foo contents')
new_orig_f.close()
def test_abort_close(self):
foo = self.path('foo')
f = GitFile(foo, 'wb')
f.abort()
try:
f.close()
except (IOError, OSError):
self.fail()
f = GitFile(foo, 'wb')
f.close()
try:
f.abort()
except (IOError, OSError):
self.fail()
def test_abort_close_removed(self):
foo = self.path('foo')
f = GitFile(foo, 'wb')
f._file.close()
os.remove(foo+".lock")
f.abort()
self.assertTrue(f._closed)