Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8391704
index.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Subscribers
None
index.py
View Options
# index.py -- File parser/writer for the git index file
# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
#
"""Parser for the git index file format."""
import
collections
import
os
import
stat
import
struct
import
sys
from
dulwich.file
import
GitFile
from
dulwich.objects
import
(
Blob
,
S_IFGITLINK
,
S_ISGITLINK
,
Tree
,
hex_to_sha
,
sha_to_hex
,
)
from
dulwich.pack
import
(
SHA1Reader
,
SHA1Writer
,
)
IndexEntry
=
collections
.
namedtuple
(
'IndexEntry'
,
[
'ctime'
,
'mtime'
,
'dev'
,
'ino'
,
'mode'
,
'uid'
,
'gid'
,
'size'
,
'sha'
,
'flags'
])
FLAG_STAGEMASK
=
0x3000
FLAG_VALID
=
0x8000
FLAG_EXTENDED
=
0x4000
def
pathsplit
(
path
):
"""Split a /-delimited path into a directory part and a basename.
Args:
path: The path to split.
Returns:
Tuple with directory name and basename
"""
try
:
(
dirname
,
basename
)
=
path
.
rsplit
(
b
"/"
,
1
)
except
ValueError
:
return
(
b
""
,
path
)
else
:
return
(
dirname
,
basename
)
def
pathjoin
(
*
args
):
"""Join a /-delimited path.
"""
return
b
"/"
.
join
([
p
for
p
in
args
if
p
])
def
read_cache_time
(
f
):
"""Read a cache time.
Args:
f: File-like object to read from
Returns:
Tuple with seconds and nanoseconds
"""
return
struct
.
unpack
(
">LL"
,
f
.
read
(
8
))
def
write_cache_time
(
f
,
t
):
"""Write a cache time.
Args:
f: File-like object to write to
t: Time to write (as int, float or tuple with secs and nsecs)
"""
if
isinstance
(
t
,
int
):
t
=
(
t
,
0
)
elif
isinstance
(
t
,
float
):
(
secs
,
nsecs
)
=
divmod
(
t
,
1.0
)
t
=
(
int
(
secs
),
int
(
nsecs
*
1000000000
))
elif
not
isinstance
(
t
,
tuple
):
raise
TypeError
(
t
)
f
.
write
(
struct
.
pack
(
">LL"
,
*
t
))
def
read_cache_entry
(
f
):
"""Read an entry from a cache file.
Args:
f: File-like object to read from
Returns:
tuple with: device, inode, mode, uid, gid, size, sha, flags
"""
beginoffset
=
f
.
tell
()
ctime
=
read_cache_time
(
f
)
mtime
=
read_cache_time
(
f
)
(
dev
,
ino
,
mode
,
uid
,
gid
,
size
,
sha
,
flags
,
)
=
\
struct
.
unpack
(
">LLLLLL20sH"
,
f
.
read
(
20
+
4
*
6
+
2
))
name
=
f
.
read
((
flags
&
0x0fff
))
# Padding:
real_size
=
((
f
.
tell
()
-
beginoffset
+
8
)
&
~
7
)
f
.
read
((
beginoffset
+
real_size
)
-
f
.
tell
())
return
(
name
,
ctime
,
mtime
,
dev
,
ino
,
mode
,
uid
,
gid
,
size
,
sha_to_hex
(
sha
),
flags
&
~
0x0fff
)
def
write_cache_entry
(
f
,
entry
):
"""Write an index entry to a file.
Args:
f: File object
entry: Entry to write, tuple with:
(name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags)
"""
beginoffset
=
f
.
tell
()
(
name
,
ctime
,
mtime
,
dev
,
ino
,
mode
,
uid
,
gid
,
size
,
sha
,
flags
)
=
entry
write_cache_time
(
f
,
ctime
)
write_cache_time
(
f
,
mtime
)
flags
=
len
(
name
)
|
(
flags
&
~
0x0fff
)
f
.
write
(
struct
.
pack
(
b
'>LLLLLL20sH'
,
dev
&
0xFFFFFFFF
,
ino
&
0xFFFFFFFF
,
mode
,
uid
,
gid
,
size
,
hex_to_sha
(
sha
),
flags
))
f
.
write
(
name
)
real_size
=
((
f
.
tell
()
-
beginoffset
+
8
)
&
~
7
)
f
.
write
(
b
'
\0
'
*
((
beginoffset
+
real_size
)
-
f
.
tell
()))
def
read_index
(
f
):
"""Read an index file, yielding the individual entries."""
header
=
f
.
read
(
4
)
if
header
!=
b
'DIRC'
:
raise
AssertionError
(
"Invalid index file header:
%r
"
%
header
)
(
version
,
num_entries
)
=
struct
.
unpack
(
b
'>LL'
,
f
.
read
(
4
*
2
))
assert
version
in
(
1
,
2
)
for
i
in
range
(
num_entries
):
yield
read_cache_entry
(
f
)
def
read_index_dict
(
f
):
"""Read an index file and return it as a dictionary.
Args:
f: File object to read from
"""
ret
=
{}
for
x
in
read_index
(
f
):
ret
[
x
[
0
]]
=
IndexEntry
(
*
x
[
1
:])
return
ret
def
write_index
(
f
,
entries
):
"""Write an index file.
Args:
f: File-like object to write to
entries: Iterable over the entries to write
"""
f
.
write
(
b
'DIRC'
)
f
.
write
(
struct
.
pack
(
b
'>LL'
,
2
,
len
(
entries
)))
for
x
in
entries
:
write_cache_entry
(
f
,
x
)
def
write_index_dict
(
f
,
entries
):
"""Write an index file based on the contents of a dictionary.
"""
entries_list
=
[]
for
name
in
sorted
(
entries
):
entries_list
.
append
((
name
,)
+
tuple
(
entries
[
name
]))
write_index
(
f
,
entries_list
)
def
cleanup_mode
(
mode
):
"""Cleanup a mode value.
This will return a mode that can be stored in a tree object.
Args:
mode: Mode to clean up.
"""
if
stat
.
S_ISLNK
(
mode
):
return
stat
.
S_IFLNK
elif
stat
.
S_ISDIR
(
mode
):
return
stat
.
S_IFDIR
elif
S_ISGITLINK
(
mode
):
return
S_IFGITLINK
ret
=
stat
.
S_IFREG
|
0
o644
if
mode
&
0
o100
:
ret
|=
0
o111
return
ret
class
Index
(
object
):
"""A Git Index file."""
def
__init__
(
self
,
filename
):
"""Open an index file.
Args:
filename: Path to the index file
"""
self
.
_filename
=
filename
self
.
clear
()
self
.
read
()
@property
def
path
(
self
):
return
self
.
_filename
def
__repr__
(
self
):
return
"
%s
(
%r
)"
%
(
self
.
__class__
.
__name__
,
self
.
_filename
)
def
write
(
self
):
"""Write current contents of index to disk."""
f
=
GitFile
(
self
.
_filename
,
'wb'
)
try
:
f
=
SHA1Writer
(
f
)
write_index_dict
(
f
,
self
.
_byname
)
finally
:
f
.
close
()
def
read
(
self
):
"""Read current contents of index from disk."""
if
not
os
.
path
.
exists
(
self
.
_filename
):
return
f
=
GitFile
(
self
.
_filename
,
'rb'
)
try
:
f
=
SHA1Reader
(
f
)
for
x
in
read_index
(
f
):
self
[
x
[
0
]]
=
IndexEntry
(
*
x
[
1
:])
# FIXME: Additional data?
f
.
read
(
os
.
path
.
getsize
(
self
.
_filename
)
-
f
.
tell
()
-
20
)
f
.
check_sha
()
finally
:
f
.
close
()
def
__len__
(
self
):
"""Number of entries in this index file."""
return
len
(
self
.
_byname
)
def
__getitem__
(
self
,
name
):
"""Retrieve entry by relative path.
Returns: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha,
flags)
"""
return
self
.
_byname
[
name
]
def
__iter__
(
self
):
"""Iterate over the paths in this index."""
return
iter
(
self
.
_byname
)
def
get_sha1
(
self
,
path
):
"""Return the (git object) SHA1 for the object at a path."""
return
self
[
path
]
.
sha
def
get_mode
(
self
,
path
):
"""Return the POSIX file mode for the object at a path."""
return
self
[
path
]
.
mode
def
iterobjects
(
self
):
"""Iterate over path, sha, mode tuples for use with commit_tree."""
for
path
in
self
:
entry
=
self
[
path
]
yield
path
,
entry
.
sha
,
cleanup_mode
(
entry
.
mode
)
def
iterblobs
(
self
):
import
warnings
warnings
.
warn
(
'Use iterobjects() instead.'
,
PendingDeprecationWarning
)
return
self
.
iterobjects
()
def
clear
(
self
):
"""Remove all contents from this index."""
self
.
_byname
=
{}
def
__setitem__
(
self
,
name
,
x
):
assert
isinstance
(
name
,
bytes
)
assert
len
(
x
)
==
10
# Remove the old entry if any
self
.
_byname
[
name
]
=
IndexEntry
(
*
x
)
def
__delitem__
(
self
,
name
):
assert
isinstance
(
name
,
bytes
)
del
self
.
_byname
[
name
]
def
iteritems
(
self
):
return
self
.
_byname
.
items
()
def
items
(
self
):
return
self
.
_byname
.
items
()
def
update
(
self
,
entries
):
for
name
,
value
in
entries
.
items
():
self
[
name
]
=
value
def
changes_from_tree
(
self
,
object_store
,
tree
,
want_unchanged
=
False
):
"""Find the differences between the contents of this index and a tree.
Args:
object_store: Object store to use for retrieving tree contents
tree: SHA1 of the root tree
want_unchanged: Whether unchanged files should be reported
Returns: Iterator over tuples with (oldpath, newpath), (oldmode,
newmode), (oldsha, newsha)
"""
def
lookup_entry
(
path
):
entry
=
self
[
path
]
return
entry
.
sha
,
cleanup_mode
(
entry
.
mode
)
for
(
name
,
mode
,
sha
)
in
changes_from_tree
(
self
.
_byname
.
keys
(),
lookup_entry
,
object_store
,
tree
,
want_unchanged
=
want_unchanged
):
yield
(
name
,
mode
,
sha
)
def
commit
(
self
,
object_store
):
"""Create a new tree from an index.
Args:
object_store: Object store to save the tree in
Returns:
Root tree SHA
"""
return
commit_tree
(
object_store
,
self
.
iterobjects
())
def
commit_tree
(
object_store
,
blobs
):
"""Commit a new tree.
Args:
object_store: Object store to add trees to
blobs: Iterable over blob path, sha, mode entries
Returns:
SHA1 of the created tree.
"""
trees
=
{
b
''
:
{}}
def
add_tree
(
path
):
if
path
in
trees
:
return
trees
[
path
]
dirname
,
basename
=
pathsplit
(
path
)
t
=
add_tree
(
dirname
)
assert
isinstance
(
basename
,
bytes
)
newtree
=
{}
t
[
basename
]
=
newtree
trees
[
path
]
=
newtree
return
newtree
for
path
,
sha
,
mode
in
blobs
:
tree_path
,
basename
=
pathsplit
(
path
)
tree
=
add_tree
(
tree_path
)
tree
[
basename
]
=
(
mode
,
sha
)
def
build_tree
(
path
):
tree
=
Tree
()
for
basename
,
entry
in
trees
[
path
]
.
items
():
if
isinstance
(
entry
,
dict
):
mode
=
stat
.
S_IFDIR
sha
=
build_tree
(
pathjoin
(
path
,
basename
))
else
:
(
mode
,
sha
)
=
entry
tree
.
add
(
basename
,
mode
,
sha
)
object_store
.
add_object
(
tree
)
return
tree
.
id
return
build_tree
(
b
''
)
def
commit_index
(
object_store
,
index
):
"""Create a new tree from an index.
Args:
object_store: Object store to save the tree in
index: Index file
Note: This function is deprecated, use index.commit() instead.
Returns: Root tree sha.
"""
return
commit_tree
(
object_store
,
index
.
iterobjects
())
def
changes_from_tree
(
names
,
lookup_entry
,
object_store
,
tree
,
want_unchanged
=
False
):
"""Find the differences between the contents of a tree and
a working copy.
Args:
names: Iterable of names in the working copy
lookup_entry: Function to lookup an entry in the working copy
object_store: Object store to use for retrieving tree contents
tree: SHA1 of the root tree, or None for an empty tree
want_unchanged: Whether unchanged files should be reported
Returns: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
(oldsha, newsha)
"""
# TODO(jelmer): Support a include_trees option
other_names
=
set
(
names
)
if
tree
is
not
None
:
for
(
name
,
mode
,
sha
)
in
object_store
.
iter_tree_contents
(
tree
):
try
:
(
other_sha
,
other_mode
)
=
lookup_entry
(
name
)
except
KeyError
:
# Was removed
yield
((
name
,
None
),
(
mode
,
None
),
(
sha
,
None
))
else
:
other_names
.
remove
(
name
)
if
(
want_unchanged
or
other_sha
!=
sha
or
other_mode
!=
mode
):
yield
((
name
,
name
),
(
mode
,
other_mode
),
(
sha
,
other_sha
))
# Mention added files
for
name
in
other_names
:
try
:
(
other_sha
,
other_mode
)
=
lookup_entry
(
name
)
except
KeyError
:
pass
else
:
yield
((
None
,
name
),
(
None
,
other_mode
),
(
None
,
other_sha
))
def
index_entry_from_stat
(
stat_val
,
hex_sha
,
flags
,
mode
=
None
):
"""Create a new index entry from a stat value.
Args:
stat_val: POSIX stat_result instance
hex_sha: Hex sha of the object
flags: Index flags
"""
if
mode
is
None
:
mode
=
cleanup_mode
(
stat_val
.
st_mode
)
return
IndexEntry
(
stat_val
.
st_ctime
,
stat_val
.
st_mtime
,
stat_val
.
st_dev
,
stat_val
.
st_ino
,
mode
,
stat_val
.
st_uid
,
stat_val
.
st_gid
,
stat_val
.
st_size
,
hex_sha
,
flags
)
def
build_file_from_blob
(
blob
,
mode
,
target_path
,
honor_filemode
=
True
,
tree_encoding
=
'utf-8'
):
"""Build a file or symlink on disk based on a Git object.
Args:
obj: The git object
mode: File mode
target_path: Path to write to
honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
Returns: stat object for the file
"""
try
:
oldstat
=
os
.
lstat
(
target_path
)
except
FileNotFoundError
:
oldstat
=
None
contents
=
blob
.
as_raw_string
()
if
stat
.
S_ISLNK
(
mode
):
# FIXME: This will fail on Windows. What should we do instead?
if
oldstat
:
os
.
unlink
(
target_path
)
if
sys
.
platform
==
'win32'
:
# os.readlink on Python3 on Windows requires a unicode string.
contents
=
contents
.
decode
(
tree_encoding
)
target_path
=
target_path
.
decode
(
tree_encoding
)
os
.
symlink
(
contents
,
target_path
)
else
:
if
oldstat
is
not
None
and
oldstat
.
st_size
==
len
(
contents
):
with
open
(
target_path
,
'rb'
)
as
f
:
if
f
.
read
()
==
contents
:
return
oldstat
with
open
(
target_path
,
'wb'
)
as
f
:
# Write out file
f
.
write
(
contents
)
if
honor_filemode
:
os
.
chmod
(
target_path
,
mode
)
return
os
.
lstat
(
target_path
)
INVALID_DOTNAMES
=
(
b
".git"
,
b
"."
,
b
".."
,
b
""
)
def
validate_path_element_default
(
element
):
return
element
.
lower
()
not
in
INVALID_DOTNAMES
def
validate_path_element_ntfs
(
element
):
stripped
=
element
.
rstrip
(
b
". "
)
.
lower
()
if
stripped
in
INVALID_DOTNAMES
:
return
False
if
stripped
==
b
"git~1"
:
return
False
return
True
def
validate_path
(
path
,
element_validator
=
validate_path_element_default
):
"""Default path validator that just checks for .git/."""
parts
=
path
.
split
(
b
"/"
)
for
p
in
parts
:
if
not
element_validator
(
p
):
return
False
else
:
return
True
def
build_index_from_tree
(
root_path
,
index_path
,
object_store
,
tree_id
,
honor_filemode
=
True
,
validate_path_element
=
validate_path_element_default
):
"""Generate and materialize index from a tree
Args:
tree_id: Tree to materialize
root_path: Target dir for materialized index files
index_path: Target path for generated index
object_store: Non-empty object store holding tree contents
honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
validate_path_element: Function to validate path elements to check
out; default just refuses .git and .. directories.
Note: existing index is wiped and contents are not merged
in a working dir. Suitable only for fresh clones.
"""
index
=
Index
(
index_path
)
if
not
isinstance
(
root_path
,
bytes
):
root_path
=
os
.
fsencode
(
root_path
)
for
entry
in
object_store
.
iter_tree_contents
(
tree_id
):
if
not
validate_path
(
entry
.
path
,
validate_path_element
):
continue
full_path
=
_tree_to_fs_path
(
root_path
,
entry
.
path
)
if
not
os
.
path
.
exists
(
os
.
path
.
dirname
(
full_path
)):
os
.
makedirs
(
os
.
path
.
dirname
(
full_path
))
# TODO(jelmer): Merge new index into working tree
if
S_ISGITLINK
(
entry
.
mode
):
if
not
os
.
path
.
isdir
(
full_path
):
os
.
mkdir
(
full_path
)
st
=
os
.
lstat
(
full_path
)
# TODO(jelmer): record and return submodule paths
else
:
obj
=
object_store
[
entry
.
sha
]
st
=
build_file_from_blob
(
obj
,
entry
.
mode
,
full_path
,
honor_filemode
=
honor_filemode
)
# Add file to index
if
not
honor_filemode
or
S_ISGITLINK
(
entry
.
mode
):
# we can not use tuple slicing to build a new tuple,
# because on windows that will convert the times to
# longs, which causes errors further along
st_tuple
=
(
entry
.
mode
,
st
.
st_ino
,
st
.
st_dev
,
st
.
st_nlink
,
st
.
st_uid
,
st
.
st_gid
,
st
.
st_size
,
st
.
st_atime
,
st
.
st_mtime
,
st
.
st_ctime
)
st
=
st
.
__class__
(
st_tuple
)
index
[
entry
.
path
]
=
index_entry_from_stat
(
st
,
entry
.
sha
,
0
)
index
.
write
()
def
blob_from_path_and_stat
(
fs_path
,
st
,
tree_encoding
=
'utf-8'
):
"""Create a blob from a path and a stat object.
Args:
fs_path: Full file system path to file
st: A stat object
Returns: A `Blob` object
"""
assert
isinstance
(
fs_path
,
bytes
)
blob
=
Blob
()
if
stat
.
S_ISLNK
(
st
.
st_mode
):
if
sys
.
platform
==
'win32'
:
# os.readlink on Python3 on Windows requires a unicode string.
fs_path
=
os
.
fsdecode
(
fs_path
)
blob
.
data
=
os
.
readlink
(
fs_path
)
.
encode
(
tree_encoding
)
else
:
blob
.
data
=
os
.
readlink
(
fs_path
)
else
:
with
open
(
fs_path
,
'rb'
)
as
f
:
blob
.
data
=
f
.
read
()
return
blob
def
read_submodule_head
(
path
):
"""Read the head commit of a submodule.
Args:
path: path to the submodule
Returns: HEAD sha, None if not a valid head/repository
"""
from
dulwich.errors
import
NotGitRepository
from
dulwich.repo
import
Repo
# Repo currently expects a "str", so decode if necessary.
# TODO(jelmer): Perhaps move this into Repo() ?
if
not
isinstance
(
path
,
str
):
path
=
os
.
fsdecode
(
path
)
try
:
repo
=
Repo
(
path
)
except
NotGitRepository
:
return
None
try
:
return
repo
.
head
()
except
KeyError
:
return
None
def
_has_directory_changed
(
tree_path
,
entry
):
"""Check if a directory has changed after getting an error.
When handling an error trying to create a blob from a path, call this
function. It will check if the path is a directory. If it's a directory
and a submodule, check the submodule head to see if it's has changed. If
not, consider the file as changed as Git tracked a file and not a
directory.
Return true if the given path should be considered as changed and False
otherwise or if the path is not a directory.
"""
# This is actually a directory
if
os
.
path
.
exists
(
os
.
path
.
join
(
tree_path
,
b
'.git'
)):
# Submodule
head
=
read_submodule_head
(
tree_path
)
if
entry
.
sha
!=
head
:
return
True
else
:
# The file was changed to a directory, so consider it removed.
return
True
return
False
def
get_unstaged_changes
(
index
,
root_path
,
filter_blob_callback
=
None
):
"""Walk through an index and check for differences against working tree.
Args:
index: index to check
root_path: path in which to find files
Returns: iterator over paths with unstaged changes
"""
# For each entry in the index check the sha1 & ensure not staged
if
not
isinstance
(
root_path
,
bytes
):
root_path
=
os
.
fsencode
(
root_path
)
for
tree_path
,
entry
in
index
.
iteritems
():
full_path
=
_tree_to_fs_path
(
root_path
,
tree_path
)
try
:
st
=
os
.
lstat
(
full_path
)
if
stat
.
S_ISDIR
(
st
.
st_mode
):
if
_has_directory_changed
(
tree_path
,
entry
):
yield
tree_path
continue
if
not
stat
.
S_ISREG
(
st
.
st_mode
)
and
not
stat
.
S_ISLNK
(
st
.
st_mode
):
continue
blob
=
blob_from_path_and_stat
(
full_path
,
st
)
if
filter_blob_callback
is
not
None
:
blob
=
filter_blob_callback
(
blob
,
tree_path
)
except
FileNotFoundError
:
# The file was removed, so we assume that counts as
# different from whatever file used to exist.
yield
tree_path
else
:
if
blob
.
id
!=
entry
.
sha
:
yield
tree_path
os_sep_bytes
=
os
.
sep
.
encode
(
'ascii'
)
def
_tree_to_fs_path
(
root_path
,
tree_path
):
"""Convert a git tree path to a file system path.
Args:
root_path: Root filesystem path
tree_path: Git tree path as bytes
Returns: File system path.
"""
assert
isinstance
(
tree_path
,
bytes
)
if
os_sep_bytes
!=
b
'/'
:
sep_corrected_path
=
tree_path
.
replace
(
b
'/'
,
os_sep_bytes
)
else
:
sep_corrected_path
=
tree_path
return
os
.
path
.
join
(
root_path
,
sep_corrected_path
)
def
_fs_to_tree_path
(
fs_path
):
"""Convert a file system path to a git tree path.
Args:
fs_path: File system path.
Returns: Git tree path as bytes
"""
if
not
isinstance
(
fs_path
,
bytes
):
fs_path_bytes
=
os
.
fsencode
(
fs_path
)
else
:
fs_path_bytes
=
fs_path
if
os_sep_bytes
!=
b
'/'
:
tree_path
=
fs_path_bytes
.
replace
(
os_sep_bytes
,
b
'/'
)
else
:
tree_path
=
fs_path_bytes
return
tree_path
def
index_entry_from_path
(
path
,
object_store
=
None
):
"""Create an index from a filesystem path.
This returns an index value for files, symlinks
and tree references. for directories and
non-existant files it returns None
Args:
path: Path to create an index entry for
object_store: Optional object store to
save new blobs in
Returns: An index entry; None for directories
"""
assert
isinstance
(
path
,
bytes
)
st
=
os
.
lstat
(
path
)
if
stat
.
S_ISDIR
(
st
.
st_mode
):
if
os
.
path
.
exists
(
os
.
path
.
join
(
path
,
b
'.git'
)):
head
=
read_submodule_head
(
path
)
if
head
is
None
:
return
None
return
index_entry_from_stat
(
st
,
head
,
0
,
mode
=
S_IFGITLINK
)
return
None
if
stat
.
S_ISREG
(
st
.
st_mode
)
or
stat
.
S_ISLNK
(
st
.
st_mode
):
blob
=
blob_from_path_and_stat
(
path
,
st
)
if
object_store
is
not
None
:
object_store
.
add_object
(
blob
)
return
index_entry_from_stat
(
st
,
blob
.
id
,
0
)
return
None
def
iter_fresh_entries
(
paths
,
root_path
,
object_store
=
None
):
"""Iterate over current versions of index entries on disk.
Args:
paths: Paths to iterate over
root_path: Root path to access from
store: Optional store to save new blobs in
Returns: Iterator over path, index_entry
"""
for
path
in
paths
:
p
=
_tree_to_fs_path
(
root_path
,
path
)
try
:
entry
=
index_entry_from_path
(
p
,
object_store
=
object_store
)
except
(
FileNotFoundError
,
IsADirectoryError
):
entry
=
None
yield
path
,
entry
def
iter_fresh_blobs
(
index
,
root_path
):
"""Iterate over versions of blobs on disk referenced by index.
Don't use this function; it removes missing entries from index.
Args:
index: Index file
root_path: Root path to access from
include_deleted: Include deleted entries with sha and
mode set to None
Returns: Iterator over path, sha, mode
"""
import
warnings
warnings
.
warn
(
PendingDeprecationWarning
,
"Use iter_fresh_objects instead."
)
for
entry
in
iter_fresh_objects
(
index
,
root_path
,
include_deleted
=
True
):
if
entry
[
1
]
is
None
:
del
index
[
entry
[
0
]]
else
:
yield
entry
def
iter_fresh_objects
(
paths
,
root_path
,
include_deleted
=
False
,
object_store
=
None
):
"""Iterate over versions of objecs on disk referenced by index.
Args:
index: Index file
root_path: Root path to access from
include_deleted: Include deleted entries with sha and
mode set to None
object_store: Optional object store to report new items to
Returns: Iterator over path, sha, mode
"""
for
path
,
entry
in
iter_fresh_entries
(
paths
,
root_path
,
object_store
=
object_store
):
if
entry
is
None
:
if
include_deleted
:
yield
path
,
None
,
None
else
:
entry
=
IndexEntry
(
*
entry
)
yield
path
,
entry
.
sha
,
cleanup_mode
(
entry
.
mode
)
def
refresh_index
(
index
,
root_path
):
"""Refresh the contents of an index.
This is the equivalent to running 'git commit -a'.
Args:
index: Index to update
root_path: Root filesystem path
"""
for
path
,
entry
in
iter_fresh_entries
(
index
,
root_path
):
index
[
path
]
=
path
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 6:48 PM (12 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3398779
Attached To
rPPDW python3-dulwich packaging
Event Timeline
Log In to Comment