Page MenuHomeSoftware Heritage

git.py
No OneTemporary

# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import stat
from enum import Enum, IntEnum
from swh.model import hashutil, identifiers
ROOT_TREE_KEY = b''
class GitType(Enum):
BLOB = b'blob'
TREE = b'tree'
EXEC = b'exec'
LINK = b'link'
COMM = b'commit'
RELE = b'release'
REFS = b'ref'
class GitPerm(IntEnum):
BLOB = 0o100644
TREE = 0o040000
EXEC = 0o100755
LINK = 0o120000
def _compute_directory_git_sha1(hashes):
"""Compute a directory git sha1 from hashes.
Args:
hashes: list of tree entries with keys:
- sha1_git: the tree entry's sha1
- name: file or subdir's name
- perms: the tree entry's sha1 permissions
Returns:
the binary sha1 of the dictionary's identifier
Assumes:
Every path exists in hashes.
"""
directory = {
'entries':
[
{
'name': entry['name'],
'perms': entry['perms'],
'target': entry['sha1_git'],
'type': 'dir' if entry['perms'] == GitPerm.TREE else 'file',
}
for entry in hashes
]
}
return hashutil.hash_to_bytes(identifiers.directory_identifier(directory))
def compute_directory_git_sha1(dirpath, hashes):
"""Compute a directory git sha1 for a dirpath.
Args:
dirpath: the directory's absolute path
hashes: list of tree entries with keys:
- sha1_git: the tree entry's sha1
- name: file or subdir's name
- perms: the tree entry's sha1 permissions
Returns:
the binary sha1 of the dictionary's identifier
Assumes:
Every path exists in hashes.
"""
return _compute_directory_git_sha1(hashes[dirpath])
def compute_revision_sha1_git(revision):
"""Compute a revision sha1 git from its dict representation.
Args:
revision: Additional dictionary information needed to compute a
synthetic
revision. Following keys are expected:
- author
- date
- committer
- committer_date
- message
- type
- directory: binary form of the tree hash
Returns:
revision sha1 in bytes
# FIXME: beware, bytes output from storage api
"""
return hashutil.hash_to_bytes(identifiers.revision_identifier(revision))
def compute_release_sha1_git(release):
"""Compute a release sha1 git from its dict representation.
Args:
release: Additional dictionary information needed to compute a
synthetic release. Following keys are expected:
- name
- message
- date
- author
- revision: binary form of the sha1_git revision targeted by this
Returns:
release sha1 in bytes
"""
return hashutil.hash_to_bytes(identifiers.release_identifier(release))
def compute_link_metadata(linkpath):
"""Given a linkpath, compute the git metadata.
Args:
linkpath: absolute pathname of the link
Returns:
Dictionary of values:
- data: link's content
- length: link's content length
- name: basename of the link
- perms: git permission for link
- type: git type for link
- path: absolute path to the link on filesystem
"""
data = os.readlink(linkpath)
link_metadata = hashutil.hash_data(data)
link_metadata.update({
'data': data,
'length': len(data),
'name': os.path.basename(linkpath),
'perms': GitPerm.LINK,
'type': GitType.BLOB,
'path': linkpath
})
return link_metadata
def compute_blob_metadata(filepath):
"""Given a filepath resolving to a regular file, compute the metadata.
Other file types (fifo, character or block device, symlink) will
be considered empty regular file. To deal properly with symlinks,
use swh.model.git.compute_link_metadata.
Args:
filepath: absolute pathname of the regular file.
Returns:
Dictionary of values:
- name: basename of the file
- length: data length
- perms: git permission for file
- type: git type for file
- path: absolute filepath on filesystem
"""
mode = os.lstat(filepath).st_mode
if not stat.S_ISREG(mode): # special (block or character device, fifo)
perms = GitPerm.BLOB
blob_metadata = hashutil.hash_data(b'')
blob_metadata['length'] = 0
else:
perms = GitPerm.EXEC if os.access(filepath, os.X_OK) else GitPerm.BLOB
blob_metadata = hashutil.hash_path(filepath)
blob_metadata.update({
'name': os.path.basename(filepath),
'perms': perms,
'type': GitType.BLOB,
'path': filepath
})
return blob_metadata
def _compute_tree_metadata(dirname, hashes):
"""Given a dirname, compute the git metadata.
Args:
dirname: absolute pathname of the directory.
hashes: list of tree dirname's entries with keys:
- sha1_git: the tree entry's sha1
- name: file or subdir's name
- perms: the tree entry's sha1 permissions
Returns:
Dictionary of values:
- sha1_git: tree's sha1 git
- name: basename of the directory
- perms: git permission for directory
- type: git type for directory
- path: absolute path to directory on filesystem
"""
return {
'sha1_git': _compute_directory_git_sha1(hashes),
'name': os.path.basename(dirname),
'perms': GitPerm.TREE,
'type': GitType.TREE,
'path': dirname
}
def compute_tree_metadata(dirname, ls_hashes):
"""Given a dirname, compute the git metadata.
Args:
dirname: absolute pathname of the directory.
ls_hashes: dictionary of path, hashes
Returns:
Dictionary of values:
- sha1_git: tree's sha1 git
- name: basename of the directory
- perms: git permission for directory
- type: git type for directory
- path: absolute path to directory on filesystem
"""
return _compute_tree_metadata(dirname, ls_hashes[dirname])
def default_validation_dir(dirpath):
"""Default validation function.
This is the equivalent of the identity function.
Args:
dirpath: Path to validate
Returns: True
"""
return True
def _walk(rootdir,
dir_ok_fn=default_validation_dir,
remove_empty_folder=False):
"""Walk the filesystem and yields a 3 tuples (dirpath, dirnames as set
of absolute paths, filenames as set of abslute paths)
Ignore files which won't pass the dir_ok_fn validation.
If remove_empty_folder is True, remove and ignore any
encountered empty folder.
Args:
- rootdir: starting walk root directory path
- dir_ok_fn: validation function. if folder encountered are
not ok, they are ignored. Default to default_validation_dir
which does nothing.
- remove_empty_folder: Flag to remove and ignore any
encountered empty folders.
Yields:
3 tuples dirpath, set of absolute children dirname paths, set
of absolute filename paths.
"""
def basic_gen_dir(rootdir):
for dp, dns, fns in os.walk(rootdir, topdown=False):
yield (dp,
set((os.path.join(dp, dn) for dn in dns)),
set((os.path.join(dp, fn) for fn in fns)))
if dir_ok_fn == default_validation_dir:
if not remove_empty_folder: # os.walk
yield from basic_gen_dir(rootdir)
else: # os.walk + empty dir cleanup
empty_folders = set()
for dp, dns, fns in basic_gen_dir(rootdir):
if not dns and not fns:
empty_folders.add(dp)
# need to remove it because folder of empty folder
# is an empty folder!!!
if os.path.islink(dp):
os.remove(dp)
else:
os.rmdir(dp)
parent = os.path.dirname(dp)
# edge case about parent containing one empty
# folder which become an empty one
while not os.listdir(parent):
empty_folders.add(parent)
if os.path.islink(parent):
os.remove(parent)
else:
os.rmdir(parent)
parent = os.path.dirname(parent)
continue
yield (dp, dns - empty_folders, fns)
else:
def filtfn(dirnames):
return set(filter(dir_ok_fn, dirnames))
gen_dir = ((dp, dns, fns) for dp, dns, fns
in basic_gen_dir(rootdir) if dir_ok_fn(dp))
if not remove_empty_folder: # os.walk + filtering
for dp, dns, fns in gen_dir:
yield (dp, filtfn(dns), fns)
else: # os.walk + filtering + empty dir cleanup
empty_folders = set()
for dp, dns, fns in gen_dir:
dps = filtfn(dns)
if not dps and not fns:
empty_folders.add(dp)
# need to remove it because folder of empty folder
# is an empty folder!!!
if os.path.islink(dp):
os.remove(dp)
else:
os.rmdir(dp)
parent = os.path.dirname(dp)
# edge case about parent containing one empty
# folder which become an empty one
while not os.listdir(parent):
empty_folders.add(parent)
if os.path.islink(parent):
os.remove(parent)
else:
os.rmdir(parent)
parent = os.path.dirname(parent)
continue
yield dp, dps - empty_folders, fns
def walk_and_compute_sha1_from_directory(rootdir,
dir_ok_fn=default_validation_dir,
with_root_tree=True,
remove_empty_folder=False):
"""(Deprecated) TODO migrate the code to
compute_hashes_from_directory.
Compute git sha1 from directory rootdir.
Args:
- rootdir: Root directory from which beginning the git hash computation
- dir_ok_fn: Filter function to filter directory according to rules
defined in the function. By default, all folders are ok.
Example override: dir_ok_fn = lambda dirpath: b'svn' not in dirpath
- with_root_tree: Determine if we compute the upper root tree's
checksums. As a default, we want it. One possible use case where this
is not useful is the update (cf. `update_checksums_from`)
Returns:
Dictionary of entries with keys <path-name> and as values a list of
directory entries.
Those are list of dictionary with keys:
- 'perms'
- 'type'
- 'name'
- 'sha1_git'
- and specifically content: 'sha1', 'sha256', ...
Note:
One special key is ROOT_TREE_KEY to indicate the upper root of the
directory (this is the revision's directory).
Raises:
Nothing
If something is raised, this is a programmatic error.
"""
ls_hashes = {}
all_links = set()
if rootdir.endswith(b'/'):
rootdir = rootdir.rstrip(b'/')
for dirpath, dirnames, filenames in _walk(
rootdir, dir_ok_fn, remove_empty_folder):
hashes = []
links = (file
for file in filenames.union(dirnames)
if os.path.islink(file))
for linkpath in links:
all_links.add(linkpath)
m_hashes = compute_link_metadata(linkpath)
hashes.append(m_hashes)
for filepath in (file for file in filenames if file not in all_links):
m_hashes = compute_blob_metadata(filepath)
hashes.append(m_hashes)
ls_hashes[dirpath] = hashes
dir_hashes = []
for fulldirname in (dir for dir in dirnames if dir not in all_links):
tree_hash = _compute_tree_metadata(fulldirname,
ls_hashes[fulldirname])
dir_hashes.append(tree_hash)
ls_hashes[dirpath].extend(dir_hashes)
if with_root_tree:
# compute the current directory hashes
root_hash = {
'sha1_git': _compute_directory_git_sha1(ls_hashes[rootdir]),
'path': rootdir,
'name': os.path.basename(rootdir),
'perms': GitPerm.TREE,
'type': GitType.TREE
}
ls_hashes[ROOT_TREE_KEY] = [root_hash]
return ls_hashes
def compute_hashes_from_directory(rootdir,
dir_ok_fn=default_validation_dir,
remove_empty_folder=False):
"""Compute git sha1 from directory rootdir.
Args:
- rootdir: Root directory from which beginning the git hash
computation
- dir_ok_fn: Filter function to filter directory according to rules
defined in the function. By default, all folders are ok.
Example override: dir_ok_fn = lambda dirpath: b'svn' not in dirpath
Returns:
Dictionary of entries with keys absolute path name.
Path-name can be a file/link or directory.
The associated value is a dictionary with:
- checksums: the dictionary with the hashes for the link/file/dir
Those are list of dictionary with keys:
- 'perms'
- 'type'
- 'name'
- 'sha1_git'
- and specifically content: 'sha1', 'sha256', ...
- children: Only for a directory, the set of children paths
Note:
One special key is the / which indicates the upper root of
the directory (this is the revision's directory).
Raises:
Nothing
If something is raised, this is a programmatic error.
"""
def _get_dict_from_dirpath(_dict, path):
"""Retrieve the default associated value for key path.
"""
return _dict.get(path, dict(children=set(), checksums=None))
def _get_dict_from_filepath(_dict, path):
"""Retrieve the default associated value for key path.
"""
return _dict.get(path, dict(checksums=None))
ls_hashes = {}
all_links = set()
if rootdir.endswith(b'/'):
rootdir = rootdir.rstrip(b'/')
for dirpath, dirnames, filenames in _walk(
rootdir, dir_ok_fn, remove_empty_folder):
dir_entry = _get_dict_from_dirpath(ls_hashes, dirpath)
children = dir_entry['children']
links = (file
for file in filenames.union(dirnames)
if os.path.islink(file))
for linkpath in links:
all_links.add(linkpath)
m_hashes = compute_link_metadata(linkpath)
d = _get_dict_from_filepath(ls_hashes, linkpath)
d['checksums'] = m_hashes
ls_hashes[linkpath] = d
children.add(linkpath)
for filepath in (file for file in filenames if file not in all_links):
m_hashes = compute_blob_metadata(filepath)
d = _get_dict_from_filepath(ls_hashes, filepath)
d['checksums'] = m_hashes
ls_hashes[filepath] = d
children.add(filepath)
for fulldirname in (dir for dir in dirnames if dir not in all_links):
d_hashes = _get_dict_from_dirpath(ls_hashes, fulldirname)
tree_hash = _compute_tree_metadata(
fulldirname,
(ls_hashes[p]['checksums'] for p in d_hashes['children'])
)
d = _get_dict_from_dirpath(ls_hashes, fulldirname)
d['checksums'] = tree_hash
ls_hashes[fulldirname] = d
children.add(fulldirname)
dir_entry['children'] = children
ls_hashes[dirpath] = dir_entry
# compute the current directory hashes
d_hashes = _get_dict_from_dirpath(ls_hashes, rootdir)
root_hash = {
'sha1_git': _compute_directory_git_sha1(
(ls_hashes[p]['checksums'] for p in d_hashes['children'])
),
'path': rootdir,
'name': os.path.basename(rootdir),
'perms': GitPerm.TREE,
'type': GitType.TREE
}
d_hashes['checksums'] = root_hash
ls_hashes[rootdir] = d_hashes
return ls_hashes
def children_hashes(children, objects):
"""Given a collection of children path, yield the corresponding
hashes.
Args:
objects: objects hash as returned by git.compute_hashes_from_directory.
children: collection of bytes path
Yields:
Dictionary hashes
"""
for p in children:
c = objects.get(p)
if c:
h = c.get('checksums')
if h:
yield h
def objects_per_type(filter_type, objects_per_path):
"""Given an object dictionary returned by
`swh.model.git.compute_hashes_from_directory`, yields
corresponding element type's hashes
Args:
filter_type: one of GitType enum
objects_per_path:
Yields:
Elements of type filter_type's hashes
"""
for path, obj in objects_per_path.items():
o = obj['checksums']
if o['type'] == filter_type:
if 'children' in obj: # for trees
if obj['children']:
o['children'] = children_hashes(obj['children'],
objects_per_path)
else:
o['children'] = []
yield o

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 3:04 PM (4 d, 9 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452604

Event Timeline