diff --git a/dulwich/contrib/release_robot.py b/dulwich/contrib/release_robot.py
index a54e1d4e..f2f280e0 100644
--- a/dulwich/contrib/release_robot.py
+++ b/dulwich/contrib/release_robot.py
@@ -1,143 +1,147 @@
# release_robot.py
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Determine last version string from tags.
Alternate to `Versioneer `_ using
`Dulwich `_ to sort tags by time from
newest to oldest.
Copy the following into the package ``__init__.py`` module::
from dulwich.contrib.release_robot import get_current_version
__version__ = get_current_version()
This example assumes the tags have a leading "v" like "v0.3", and that the
``.git`` folder is in a project folder that containts the package folder.
EG::
* project
|
* .git
|
+-* package
|
* __init__.py <-- put __version__ here
"""
import datetime
import re
import sys
import time
from dulwich.repo import Repo
# CONSTANTS
PROJDIR = '.'
PATTERN = r'[ a-zA-Z_\-]*([\d\.]+[\-\w\.]*)'
def get_recent_tags(projdir=PROJDIR):
"""Get list of tags in order from newest to oldest and their datetimes.
- :param projdir: path to ``.git``
- :returns: list of tags sorted by commit time from newest to oldest
+ Args:
+ projdir: path to ``.git``
+ Returns:
+ list of tags sorted by commit time from newest to oldest
Each tag in the list contains the tag name, commit time, commit id, author
and any tag meta. If a tag isn't annotated, then its tag meta is ``None``.
Otherwise the tag meta is a tuple containing the tag time, tag id and tag
name. Time is in UTC.
"""
with Repo(projdir) as project: # dulwich repository object
refs = project.get_refs() # dictionary of refs and their SHA-1 values
tags = {} # empty dictionary to hold tags, commits and datetimes
# iterate over refs in repository
for key, value in refs.items():
key = key.decode('utf-8') # compatible with Python-3
obj = project.get_object(value) # dulwich object from SHA-1
# don't just check if object is "tag" b/c it could be a "commit"
# instead check if "tags" is in the ref-name
if u'tags' not in key:
# skip ref if not a tag
continue
# strip the leading text from refs to get "tag name"
_, tag = key.rsplit(u'/', 1)
# check if tag object is "commit" or "tag" pointing to a "commit"
try:
commit = obj.object # a tuple (commit class, commit id)
except AttributeError:
commit = obj
tag_meta = None
else:
tag_meta = (
datetime.datetime(*time.gmtime(obj.tag_time)[:6]),
obj.id.decode('utf-8'),
obj.name.decode('utf-8')
) # compatible with Python-3
commit = project.get_object(commit[1]) # commit object
# get tag commit datetime, but dulwich returns seconds since
# beginning of epoch, so use Python time module to convert it to
# timetuple then convert to datetime
tags[tag] = [
datetime.datetime(*time.gmtime(commit.commit_time)[:6]),
commit.id.decode('utf-8'),
commit.author.decode('utf-8'),
tag_meta
] # compatible with Python-3
# return list of tags sorted by their datetimes from newest to oldest
return sorted(tags.items(), key=lambda tag: tag[1][0], reverse=True)
def get_current_version(projdir=PROJDIR, pattern=PATTERN, logger=None):
"""Return the most recent tag, using an options regular expression pattern.
The default pattern will strip any characters preceding the first semantic
version. *EG*: "Release-0.2.1-rc.1" will be come "0.2.1-rc.1". If no match
is found, then the most recent tag is return without modification.
- :param projdir: path to ``.git``
- :param pattern: regular expression pattern with group that matches version
- :param logger: a Python logging instance to capture exception
- :returns: tag matching first group in regular expression pattern
+ Args:
+ projdir: path to ``.git``
+ pattern: regular expression pattern with group that matches version
+ logger: a Python logging instance to capture exception
+ Returns:
+ tag matching first group in regular expression pattern
"""
tags = get_recent_tags(projdir)
try:
tag = tags[0][0]
except IndexError:
return
matches = re.match(pattern, tag)
try:
current_version = matches.group(1)
except (IndexError, AttributeError) as err:
if logger:
logger.exception(err)
return tag
return current_version
if __name__ == '__main__':
if len(sys.argv) > 1:
_PROJDIR = sys.argv[1]
else:
_PROJDIR = PROJDIR
print(get_current_version(projdir=_PROJDIR))
diff --git a/dulwich/contrib/swift.py b/dulwich/contrib/swift.py
index bdad92ed..8436c252 100644
--- a/dulwich/contrib/swift.py
+++ b/dulwich/contrib/swift.py
@@ -1,1037 +1,1056 @@
# swift.py -- Repo implementation atop OpenStack SWIFT
# Copyright (C) 2013 eNovance SAS
#
# Author: Fabien Boucher
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Repo implementation atop OpenStack SWIFT."""
# TODO: Refactor to share more code with dulwich/repo.py.
# TODO(fbo): Second attempt to _send() must be notified via real log
# TODO(fbo): More logs for operations
import os
import stat
import zlib
import tempfile
import posixpath
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
from io import BytesIO
try:
from ConfigParser import ConfigParser
except ImportError:
from configparser import ConfigParser
from geventhttpclient import HTTPClient
from dulwich.greenthreads import (
GreenThreadsMissingObjectFinder,
GreenThreadsObjectStoreIterator,
)
from dulwich.lru_cache import LRUSizeCache
from dulwich.objects import (
Blob,
Commit,
Tree,
Tag,
S_ISGITLINK,
)
from dulwich.object_store import (
PackBasedObjectStore,
PACKDIR,
INFODIR,
)
from dulwich.pack import (
PackData,
Pack,
PackIndexer,
PackStreamCopier,
write_pack_header,
compute_file_sha,
iter_sha1,
write_pack_index_v2,
load_pack_index_file,
read_pack_header,
_compute_object_size,
unpack_object,
write_pack_object,
)
from dulwich.protocol import TCP_GIT_PORT
from dulwich.refs import (
InfoRefsContainer,
read_info_refs,
write_info_refs,
)
from dulwich.repo import (
BaseRepo,
OBJECTDIR,
)
from dulwich.server import (
Backend,
TCPGitServer,
)
try:
from simplejson import loads as json_loads
from simplejson import dumps as json_dumps
except ImportError:
from json import loads as json_loads
from json import dumps as json_dumps
import sys
"""
# Configuration file sample
[swift]
# Authentication URL (Keystone or Swift)
auth_url = http://127.0.0.1:5000/v2.0
# Authentication version to use
auth_ver = 2
# The tenant and username separated by a semicolon
username = admin;admin
# The user password
password = pass
# The Object storage region to use (auth v2) (Default RegionOne)
region_name = RegionOne
# The Object storage endpoint URL to use (auth v2) (Default internalURL)
endpoint_type = internalURL
# Concurrency to use for parallel tasks (Default 10)
concurrency = 10
# Size of the HTTP pool (Default 10)
http_pool_length = 10
# Timeout delay for HTTP connections (Default 20)
http_timeout = 20
# Chunk size to read from pack (Bytes) (Default 12228)
chunk_length = 12228
# Cache size (MBytes) (Default 20)
cache_length = 20
"""
class PackInfoObjectStoreIterator(GreenThreadsObjectStoreIterator):
def __len__(self):
while len(self.finder.objects_to_send):
for _ in range(0, len(self.finder.objects_to_send)):
sha = self.finder.next()
self._shas.append(sha)
return len(self._shas)
class PackInfoMissingObjectFinder(GreenThreadsMissingObjectFinder):
def next(self):
while True:
if not self.objects_to_send:
return None
(sha, name, leaf) = self.objects_to_send.pop()
if sha not in self.sha_done:
break
if not leaf:
info = self.object_store.pack_info_get(sha)
if info[0] == Commit.type_num:
self.add_todo([(info[2], "", False)])
elif info[0] == Tree.type_num:
self.add_todo([tuple(i) for i in info[1]])
elif info[0] == Tag.type_num:
self.add_todo([(info[1], None, False)])
if sha in self._tagged:
self.add_todo([(self._tagged[sha], None, True)])
self.sha_done.add(sha)
self.progress("counting objects: %d\r" % len(self.sha_done))
return (sha, name)
def load_conf(path=None, file=None):
"""Load configuration in global var CONF
- :param path: The path to the configuration file
- :param file: If provided read instead the file like object
+ Args:
+ path: The path to the configuration file
+ file: If provided read instead the file like object
"""
conf = ConfigParser()
if file:
try:
conf.read_file(file, path)
except AttributeError:
# read_file only exists in Python3
conf.readfp(file)
return conf
confpath = None
if not path:
try:
confpath = os.environ['DULWICH_SWIFT_CFG']
except KeyError:
raise Exception("You need to specify a configuration file")
else:
confpath = path
if not os.path.isfile(confpath):
raise Exception("Unable to read configuration file %s" % confpath)
conf.read(confpath)
return conf
def swift_load_pack_index(scon, filename):
"""Read a pack index file from Swift
- :param scon: a `SwiftConnector` instance
- :param filename: Path to the index file objectise
- :return: a `PackIndexer` instance
+ Args:
+ scon: a `SwiftConnector` instance
+ filename: Path to the index file objectise
+ Returns: a `PackIndexer` instance
"""
with scon.get_object(filename) as f:
return load_pack_index_file(filename, f)
def pack_info_create(pack_data, pack_index):
pack = Pack.from_objects(pack_data, pack_index)
info = {}
for obj in pack.iterobjects():
# Commit
if obj.type_num == Commit.type_num:
info[obj.id] = (obj.type_num, obj.parents, obj.tree)
# Tree
elif obj.type_num == Tree.type_num:
shas = [(s, n, not stat.S_ISDIR(m)) for
n, m, s in obj.items() if not S_ISGITLINK(m)]
info[obj.id] = (obj.type_num, shas)
# Blob
elif obj.type_num == Blob.type_num:
info[obj.id] = None
# Tag
elif obj.type_num == Tag.type_num:
info[obj.id] = (obj.type_num, obj.object[1])
return zlib.compress(json_dumps(info))
def load_pack_info(filename, scon=None, file=None):
if not file:
f = scon.get_object(filename)
else:
f = file
if not f:
return None
try:
return json_loads(zlib.decompress(f.read()))
finally:
f.close()
class SwiftException(Exception):
pass
class SwiftConnector(object):
"""A Connector to swift that manage authentication and errors catching
"""
def __init__(self, root, conf):
""" Initialize a SwiftConnector
- :param root: The swift container that will act as Git bare repository
- :param conf: A ConfigParser Object
+ Args:
+ root: The swift container that will act as Git bare repository
+ conf: A ConfigParser Object
"""
self.conf = conf
self.auth_ver = self.conf.get("swift", "auth_ver")
if self.auth_ver not in ["1", "2"]:
raise NotImplementedError(
"Wrong authentication version use either 1 or 2")
self.auth_url = self.conf.get("swift", "auth_url")
self.user = self.conf.get("swift", "username")
self.password = self.conf.get("swift", "password")
self.concurrency = self.conf.getint('swift', 'concurrency') or 10
self.http_timeout = self.conf.getint('swift', 'http_timeout') or 20
self.http_pool_length = \
self.conf.getint('swift', 'http_pool_length') or 10
self.region_name = self.conf.get("swift", "region_name") or "RegionOne"
self.endpoint_type = \
self.conf.get("swift", "endpoint_type") or "internalURL"
self.cache_length = self.conf.getint("swift", "cache_length") or 20
self.chunk_length = self.conf.getint("swift", "chunk_length") or 12228
self.root = root
block_size = 1024 * 12 # 12KB
if self.auth_ver == "1":
self.storage_url, self.token = self.swift_auth_v1()
else:
self.storage_url, self.token = self.swift_auth_v2()
token_header = {'X-Auth-Token': str(self.token)}
self.httpclient = \
HTTPClient.from_url(str(self.storage_url),
concurrency=self.http_pool_length,
block_size=block_size,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
headers=token_header)
self.base_path = str(posixpath.join(
urlparse.urlparse(self.storage_url).path, self.root))
def swift_auth_v1(self):
self.user = self.user.replace(";", ":")
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
headers = {'X-Auth-User': self.user,
'X-Auth-Key': self.password}
path = urlparse.urlparse(self.auth_url).path
ret = auth_httpclient.request('GET', path, headers=headers)
# Should do something with redirections (301 in my case)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v1.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
str(ret.items())))
storage_url = ret['X-Storage-Url']
token = ret['X-Auth-Token']
return storage_url, token
def swift_auth_v2(self):
self.tenant, self.user = self.user.split(';')
auth_dict = {}
auth_dict['auth'] = {'passwordCredentials':
{
'username': self.user,
'password': self.password,
},
'tenantName': self.tenant}
auth_json = json_dumps(auth_dict)
headers = {'Content-Type': 'application/json'}
auth_httpclient = HTTPClient.from_url(
self.auth_url,
connection_timeout=self.http_timeout,
network_timeout=self.http_timeout,
)
path = urlparse.urlparse(self.auth_url).path
if not path.endswith('tokens'):
path = posixpath.join(path, 'tokens')
ret = auth_httpclient.request('POST', path,
body=auth_json,
headers=headers)
if ret.status_code < 200 or ret.status_code >= 300:
raise SwiftException('AUTH v2.0 request failed on ' +
'%s with error code %s (%s)'
% (str(auth_httpclient.get_base_url()) +
path, ret.status_code,
str(ret.items())))
auth_ret_json = json_loads(ret.read())
token = auth_ret_json['access']['token']['id']
catalogs = auth_ret_json['access']['serviceCatalog']
object_store = [o_store for o_store in catalogs if
o_store['type'] == 'object-store'][0]
endpoints = object_store['endpoints']
endpoint = [endp for endp in endpoints if
endp["region"] == self.region_name][0]
return endpoint[self.endpoint_type], token
def test_root_exists(self):
"""Check that Swift container exist
:return: True if exist or None it not
"""
ret = self.httpclient.request('HEAD', self.base_path)
if ret.status_code == 404:
return None
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('HEAD request failed with error code %s'
% ret.status_code)
return True
def create_root(self):
"""Create the Swift container
:raise: `SwiftException` if unable to create
"""
if not self.test_root_exists():
ret = self.httpclient.request('PUT', self.base_path)
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('PUT request failed with error code %s'
% ret.status_code)
def get_container_objects(self):
"""Retrieve objects list in a container
:return: A list of dict that describe objects
or None if container does not exist
"""
qs = '?format=json'
path = self.base_path + qs
ret = self.httpclient.request('GET', path)
if ret.status_code == 404:
return None
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('GET request failed with error code %s'
% ret.status_code)
content = ret.read()
return json_loads(content)
def get_object_stat(self, name):
"""Retrieve object stat
- :param name: The object name
- :return: A dict that describe the object
- or None if object does not exist
+ Args:
+ name: The object name
+ Returns:
+ A dict that describe the object or None if object does not exist
"""
path = self.base_path + '/' + name
ret = self.httpclient.request('HEAD', path)
if ret.status_code == 404:
return None
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('HEAD request failed with error code %s'
% ret.status_code)
resp_headers = {}
for header, value in ret.items():
resp_headers[header.lower()] = value
return resp_headers
def put_object(self, name, content):
"""Put an object
- :param name: The object name
- :param content: A file object
- :raise: `SwiftException` if unable to create
+ Args:
+ name: The object name
+ content: A file object
+ Raises:
+ SwiftException: if unable to create
"""
content.seek(0)
data = content.read()
path = self.base_path + '/' + name
headers = {'Content-Length': str(len(data))}
def _send():
ret = self.httpclient.request('PUT', path,
body=data,
headers=headers)
return ret
try:
# Sometime got Broken Pipe - Dirty workaround
ret = _send()
except Exception:
# Second attempt work
ret = _send()
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('PUT request failed with error code %s'
% ret.status_code)
def get_object(self, name, range=None):
"""Retrieve an object
- :param name: The object name
- :param range: A string range like "0-10" to
- retrieve specified bytes in object content
- :return: A file like instance
- or bytestring if range is specified
+ Args:
+ name: The object name
+ range: A string range like "0-10" to
+ retrieve specified bytes in object content
+ Returns:
+ A file like instance or bytestring if range is specified
"""
headers = {}
if range:
headers['Range'] = 'bytes=%s' % range
path = self.base_path + '/' + name
ret = self.httpclient.request('GET', path, headers=headers)
if ret.status_code == 404:
return None
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('GET request failed with error code %s'
% ret.status_code)
content = ret.read()
if range:
return content
return BytesIO(content)
def del_object(self, name):
"""Delete an object
- :param name: The object name
- :raise: `SwiftException` if unable to delete
+ Args:
+ name: The object name
+ Raises:
+ SwiftException: if unable to delete
"""
path = self.base_path + '/' + name
ret = self.httpclient.request('DELETE', path)
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('DELETE request failed with error code %s'
% ret.status_code)
def del_root(self):
"""Delete the root container by removing container content
:raise: `SwiftException` if unable to delete
"""
for obj in self.get_container_objects():
self.del_object(obj['name'])
ret = self.httpclient.request('DELETE', self.base_path)
if ret.status_code < 200 or ret.status_code > 300:
raise SwiftException('DELETE request failed with error code %s'
% ret.status_code)
class SwiftPackReader(object):
"""A SwiftPackReader that mimic read and sync method
The reader allows to read a specified amount of bytes from
a given offset of a Swift object. A read offset is kept internaly.
The reader will read from Swift a specified amount of data to complete
its internal buffer. chunk_length specifiy the amount of data
to read from Swift.
"""
def __init__(self, scon, filename, pack_length):
"""Initialize a SwiftPackReader
- :param scon: a `SwiftConnector` instance
- :param filename: the pack filename
- :param pack_length: The size of the pack object
+ Args:
+ scon: a `SwiftConnector` instance
+ filename: the pack filename
+ pack_length: The size of the pack object
"""
self.scon = scon
self.filename = filename
self.pack_length = pack_length
self.offset = 0
self.base_offset = 0
self.buff = b''
self.buff_length = self.scon.chunk_length
def _read(self, more=False):
if more:
self.buff_length = self.buff_length * 2
offset = self.base_offset
r = min(self.base_offset + self.buff_length, self.pack_length)
ret = self.scon.get_object(self.filename, range="%s-%s" % (offset, r))
self.buff = ret
def read(self, length):
"""Read a specified amount of Bytes form the pack object
- :param length: amount of bytes to read
- :return: bytestring
+ Args:
+ length: amount of bytes to read
+ Returns:
+ a bytestring
"""
end = self.offset+length
if self.base_offset + end > self.pack_length:
data = self.buff[self.offset:]
self.offset = end
return data
if end > len(self.buff):
# Need to read more from swift
self._read(more=True)
return self.read(length)
data = self.buff[self.offset:end]
self.offset = end
return data
def seek(self, offset):
"""Seek to a specified offset
- :param offset: the offset to seek to
+ Args:
+ offset: the offset to seek to
"""
self.base_offset = offset
self._read()
self.offset = 0
def read_checksum(self):
"""Read the checksum from the pack
:return: the checksum bytestring
"""
return self.scon.get_object(self.filename, range="-20")
class SwiftPackData(PackData):
"""The data contained in a packfile.
We use the SwiftPackReader to read bytes from packs stored in Swift
using the Range header feature of Swift.
"""
def __init__(self, scon, filename):
""" Initialize a SwiftPackReader
- :param scon: a `SwiftConnector` instance
- :param filename: the pack filename
+ Args:
+ scon: a `SwiftConnector` instance
+ filename: the pack filename
"""
self.scon = scon
self._filename = filename
self._header_size = 12
headers = self.scon.get_object_stat(self._filename)
self.pack_length = int(headers['content-length'])
pack_reader = SwiftPackReader(self.scon, self._filename,
self.pack_length)
(version, self._num_objects) = read_pack_header(pack_reader.read)
self._offset_cache = LRUSizeCache(1024*1024*self.scon.cache_length,
compute_size=_compute_object_size)
self.pack = None
def get_object_at(self, offset):
if offset in self._offset_cache:
return self._offset_cache[offset]
assert offset >= self._header_size
pack_reader = SwiftPackReader(self.scon, self._filename,
self.pack_length)
pack_reader.seek(offset)
unpacked, _ = unpack_object(pack_reader.read)
return (unpacked.pack_type_num, unpacked._obj())
def get_stored_checksum(self):
pack_reader = SwiftPackReader(self.scon, self._filename,
self.pack_length)
return pack_reader.read_checksum()
def close(self):
pass
class SwiftPack(Pack):
"""A Git pack object.
Same implementation as pack.Pack except that _idx_load and
_data_load are bounded to Swift version of load_pack_index and
PackData.
"""
def __init__(self, *args, **kwargs):
self.scon = kwargs['scon']
del kwargs['scon']
super(SwiftPack, self).__init__(*args, **kwargs)
self._pack_info_path = self._basename + '.info'
self._pack_info = None
self._pack_info_load = lambda: load_pack_info(self._pack_info_path,
self.scon)
self._idx_load = lambda: swift_load_pack_index(self.scon,
self._idx_path)
self._data_load = lambda: SwiftPackData(self.scon, self._data_path)
@property
def pack_info(self):
"""The pack data object being used."""
if self._pack_info is None:
self._pack_info = self._pack_info_load()
return self._pack_info
class SwiftObjectStore(PackBasedObjectStore):
"""A Swift Object Store
Allow to manage a bare Git repository from Openstack Swift.
This object store only supports pack files and not loose objects.
"""
def __init__(self, scon):
"""Open a Swift object store.
- :param scon: A `SwiftConnector` instance
+ Args:
+ scon: A `SwiftConnector` instance
"""
super(SwiftObjectStore, self).__init__()
self.scon = scon
self.root = self.scon.root
self.pack_dir = posixpath.join(OBJECTDIR, PACKDIR)
self._alternates = None
def _update_pack_cache(self):
objects = self.scon.get_container_objects()
pack_files = [o['name'].replace(".pack", "")
for o in objects if o['name'].endswith(".pack")]
ret = []
for basename in pack_files:
pack = SwiftPack(basename, scon=self.scon)
self._pack_cache[basename] = pack
ret.append(pack)
return ret
def _iter_loose_objects(self):
"""Loose objects are not supported by this repository
"""
return []
def iter_shas(self, finder):
"""An iterator over pack's ObjectStore.
:return: a `ObjectStoreIterator` or `GreenThreadsObjectStoreIterator`
instance if gevent is enabled
"""
shas = iter(finder.next, None)
return PackInfoObjectStoreIterator(
self, shas, finder, self.scon.concurrency)
def find_missing_objects(self, *args, **kwargs):
kwargs['concurrency'] = self.scon.concurrency
return PackInfoMissingObjectFinder(self, *args, **kwargs)
def pack_info_get(self, sha):
for pack in self.packs:
if sha in pack:
return pack.pack_info[sha]
def _collect_ancestors(self, heads, common=set()):
def _find_parents(commit):
for pack in self.packs:
if commit in pack:
try:
parents = pack.pack_info[commit][1]
except KeyError:
# Seems to have no parents
return []
return parents
bases = set()
commits = set()
queue = []
queue.extend(heads)
while queue:
e = queue.pop(0)
if e in common:
bases.add(e)
elif e not in commits:
commits.add(e)
parents = _find_parents(e)
queue.extend(parents)
return (commits, bases)
def add_pack(self):
"""Add a new pack to this object store.
:return: Fileobject to write to and a commit function to
call when the pack is finished.
"""
f = BytesIO()
def commit():
f.seek(0)
pack = PackData(file=f, filename="")
entries = pack.sorted_entries()
if len(entries):
basename = posixpath.join(self.pack_dir,
"pack-%s" %
iter_sha1(entry[0] for
entry in entries))
index = BytesIO()
write_pack_index_v2(index, entries, pack.get_stored_checksum())
self.scon.put_object(basename + ".pack", f)
f.close()
self.scon.put_object(basename + ".idx", index)
index.close()
final_pack = SwiftPack(basename, scon=self.scon)
final_pack.check_length_and_checksum()
self._add_cached_pack(basename, final_pack)
return final_pack
else:
return None
def abort():
pass
return f, commit, abort
def add_object(self, obj):
self.add_objects([(obj, None), ])
def _pack_cache_stale(self):
return False
def _get_loose_object(self, sha):
return None
def add_thin_pack(self, read_all, read_some):
"""Read a thin pack
Read it from a stream and complete it in a temporary file.
Then the pack and the corresponding index file are uploaded to Swift.
"""
fd, path = tempfile.mkstemp(prefix='tmp_pack_')
f = os.fdopen(fd, 'w+b')
try:
indexer = PackIndexer(f, resolve_ext_ref=self.get_raw)
copier = PackStreamCopier(read_all, read_some, f,
delta_iter=indexer)
copier.verify()
return self._complete_thin_pack(f, path, copier, indexer)
finally:
f.close()
os.unlink(path)
def _complete_thin_pack(self, f, path, copier, indexer):
entries = list(indexer)
# Update the header with the new number of objects.
f.seek(0)
write_pack_header(f, len(entries) + len(indexer.ext_refs()))
# Must flush before reading (http://bugs.python.org/issue3207)
f.flush()
# Rescan the rest of the pack, computing the SHA with the new header.
new_sha = compute_file_sha(f, end_ofs=-20)
# Must reposition before writing (http://bugs.python.org/issue3207)
f.seek(0, os.SEEK_CUR)
# Complete the pack.
for ext_sha in indexer.ext_refs():
assert len(ext_sha) == 20
type_num, data = self.get_raw(ext_sha)
offset = f.tell()
crc32 = write_pack_object(f, type_num, data, sha=new_sha)
entries.append((ext_sha, offset, crc32))
pack_sha = new_sha.digest()
f.write(pack_sha)
f.flush()
# Move the pack in.
entries.sort()
pack_base_name = posixpath.join(
self.pack_dir,
'pack-' + iter_sha1(e[0] for e in entries).decode(
sys.getfilesystemencoding()))
self.scon.put_object(pack_base_name + '.pack', f)
# Write the index.
filename = pack_base_name + '.idx'
index_file = BytesIO()
write_pack_index_v2(index_file, entries, pack_sha)
self.scon.put_object(filename, index_file)
# Write pack info.
f.seek(0)
pack_data = PackData(filename="", file=f)
index_file.seek(0)
pack_index = load_pack_index_file('', index_file)
serialized_pack_info = pack_info_create(pack_data, pack_index)
f.close()
index_file.close()
pack_info_file = BytesIO(serialized_pack_info)
filename = pack_base_name + '.info'
self.scon.put_object(filename, pack_info_file)
pack_info_file.close()
# Add the pack to the store and return it.
final_pack = SwiftPack(pack_base_name, scon=self.scon)
final_pack.check_length_and_checksum()
self._add_cached_pack(pack_base_name, final_pack)
return final_pack
class SwiftInfoRefsContainer(InfoRefsContainer):
"""Manage references in info/refs object.
"""
def __init__(self, scon, store):
self.scon = scon
self.filename = 'info/refs'
self.store = store
f = self.scon.get_object(self.filename)
if not f:
f = BytesIO(b'')
super(SwiftInfoRefsContainer, self).__init__(f)
def _load_check_ref(self, name, old_ref):
self._check_refname(name)
f = self.scon.get_object(self.filename)
if not f:
return {}
refs = read_info_refs(f)
if old_ref is not None:
if refs[name] != old_ref:
return False
return refs
def _write_refs(self, refs):
f = BytesIO()
f.writelines(write_info_refs(refs, self.store))
self.scon.put_object(self.filename, f)
def set_if_equals(self, name, old_ref, new_ref):
"""Set a refname to new_ref only if it currently equals old_ref.
"""
if name == 'HEAD':
return True
refs = self._load_check_ref(name, old_ref)
if not isinstance(refs, dict):
return False
refs[name] = new_ref
self._write_refs(refs)
self._refs[name] = new_ref
return True
def remove_if_equals(self, name, old_ref):
"""Remove a refname only if it currently equals old_ref.
"""
if name == 'HEAD':
return True
refs = self._load_check_ref(name, old_ref)
if not isinstance(refs, dict):
return False
del refs[name]
self._write_refs(refs)
del self._refs[name]
return True
def allkeys(self):
try:
self._refs['HEAD'] = self._refs['refs/heads/master']
except KeyError:
pass
return self._refs.keys()
class SwiftRepo(BaseRepo):
def __init__(self, root, conf):
"""Init a Git bare Repository on top of a Swift container.
References are managed in info/refs objects by
`SwiftInfoRefsContainer`. The root attribute is the Swift
container that contain the Git bare repository.
- :param root: The container which contains the bare repo
- :param conf: A ConfigParser object
+ Args:
+ root: The container which contains the bare repo
+ conf: A ConfigParser object
"""
self.root = root.lstrip('/')
self.conf = conf
self.scon = SwiftConnector(self.root, self.conf)
objects = self.scon.get_container_objects()
if not objects:
raise Exception('There is not any GIT repo here : %s' % self.root)
objects = [o['name'].split('/')[0] for o in objects]
if OBJECTDIR not in objects:
raise Exception('This repository (%s) is not bare.' % self.root)
self.bare = True
self._controldir = self.root
object_store = SwiftObjectStore(self.scon)
refs = SwiftInfoRefsContainer(self.scon, object_store)
BaseRepo.__init__(self, object_store, refs)
def _determine_file_mode(self):
"""Probe the file-system to determine whether permissions can be trusted.
:return: True if permissions can be trusted, False otherwise.
"""
return False
def _put_named_file(self, filename, contents):
"""Put an object in a Swift container
- :param filename: the path to the object to put on Swift
- :param contents: the content as bytestring
+ Args:
+ filename: the path to the object to put on Swift
+ contents: the content as bytestring
"""
with BytesIO() as f:
f.write(contents)
self.scon.put_object(filename, f)
@classmethod
def init_bare(cls, scon, conf):
"""Create a new bare repository.
- :param scon: a `SwiftConnector` instance
- :param conf: a ConfigParser object
- :return: a `SwiftRepo` instance
+ Args:
+ scon: a `SwiftConnector` instance
+ conf: a ConfigParser object
+ Returns:
+ a `SwiftRepo` instance
"""
scon.create_root()
for obj in [posixpath.join(OBJECTDIR, PACKDIR),
posixpath.join(INFODIR, 'refs')]:
scon.put_object(obj, BytesIO(b''))
ret = cls(scon.root, conf)
ret._init_files(True)
return ret
class SwiftSystemBackend(Backend):
def __init__(self, logger, conf):
self.conf = conf
self.logger = logger
def open_repository(self, path):
self.logger.info('opening repository at %s', path)
return SwiftRepo(path, self.conf)
def cmd_daemon(args):
"""Entry point for starting a TCP git server."""
import optparse
parser = optparse.OptionParser()
parser.add_option("-l", "--listen_address", dest="listen_address",
default="127.0.0.1",
help="Binding IP address.")
parser.add_option("-p", "--port", dest="port", type=int,
default=TCP_GIT_PORT,
help="Binding TCP port.")
parser.add_option("-c", "--swift_config", dest="swift_config",
default="",
help="Path to the configuration file for Swift backend.")
options, args = parser.parse_args(args)
try:
import gevent
import geventhttpclient # noqa: F401
except ImportError:
print("gevent and geventhttpclient libraries are mandatory "
" for use the Swift backend.")
sys.exit(1)
import gevent.monkey
gevent.monkey.patch_socket()
from dulwich import log_utils
logger = log_utils.getLogger(__name__)
conf = load_conf(options.swift_config)
backend = SwiftSystemBackend(logger, conf)
log_utils.default_logging_config()
server = TCPGitServer(backend, options.listen_address,
port=options.port)
server.serve_forever()
def cmd_init(args):
import optparse
parser = optparse.OptionParser()
parser.add_option("-c", "--swift_config", dest="swift_config",
default="",
help="Path to the configuration file for Swift backend.")
options, args = parser.parse_args(args)
conf = load_conf(options.swift_config)
if args == []:
parser.error("missing repository name")
repo = args[0]
scon = SwiftConnector(repo, conf)
SwiftRepo.init_bare(scon, conf)
def main(argv=sys.argv):
commands = {
"init": cmd_init,
"daemon": cmd_daemon,
}
if len(sys.argv) < 2:
print("Usage: %s <%s> [OPTIONS...]" % (
sys.argv[0], "|".join(commands.keys())))
sys.exit(1)
cmd = sys.argv[1]
if cmd not in commands:
print("No such subcommand: %s" % cmd)
sys.exit(1)
commands[cmd](sys.argv[2:])
if __name__ == '__main__':
main()
diff --git a/dulwich/ignore.py b/dulwich/ignore.py
index 51dcf178..a04d29dc 100644
--- a/dulwich/ignore.py
+++ b/dulwich/ignore.py
@@ -1,362 +1,374 @@
# Copyright (C) 2017 Jelmer Vernooij
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Parsing of gitignore files.
For details for the matching rules, see https://git-scm.com/docs/gitignore
"""
import os.path
import re
import sys
def _translate_segment(segment):
if segment == b"*":
return b'[^/]+'
res = b""
i, n = 0, len(segment)
while i < n:
c = segment[i:i+1]
i = i+1
if c == b'*':
res += b'[^/]*'
elif c == b'?':
res += b'[^/]'
elif c == b'[':
j = i
if j < n and segment[j:j+1] == b'!':
j = j+1
if j < n and segment[j:j+1] == b']':
j = j+1
while j < n and segment[j:j+1] != b']':
j = j+1
if j >= n:
res += b'\\['
else:
stuff = segment[i:j].replace(b'\\', b'\\\\')
i = j+1
if stuff.startswith(b'!'):
stuff = b'^' + stuff[1:]
elif stuff.startswith(b'^'):
stuff = b'\\' + stuff
res += b'[' + stuff + b']'
else:
res += re.escape(c)
return res
def translate(pat):
"""Translate a shell PATTERN to a regular expression.
There is no way to quote meta-characters.
Originally copied from fnmatch in Python 2.7, but modified for Dulwich
to cope with features in Git ignore patterns.
"""
res = b'(?ms)'
if b'/' not in pat[:-1]:
# If there's no slash, this is a filename-based match
res += b'(.*/)?'
if pat.startswith(b'**/'):
# Leading **/
pat = pat[2:]
res += b'(.*/)?'
if pat.startswith(b'/'):
pat = pat[1:]
for i, segment in enumerate(pat.split(b'/')):
if segment == b'**':
res += b'(/.*)?'
continue
else:
res += ((re.escape(b'/') if i > 0 else b'') +
_translate_segment(segment))
if not pat.endswith(b'/'):
res += b'/?'
return res + b'\\Z'
def read_ignore_patterns(f):
"""Read a git ignore file.
Args:
f: File-like object to read from
Returns: List of patterns
"""
for line in f:
line = line.rstrip(b"\r\n")
# Ignore blank lines, they're used for readability.
if not line:
continue
if line.startswith(b'#'):
# Comment
continue
# Trailing spaces are ignored unless they are quoted with a backslash.
while line.endswith(b' ') and not line.endswith(b'\\ '):
line = line[:-1]
line = line.replace(b'\\ ', b' ')
yield line
def match_pattern(path, pattern, ignorecase=False):
"""Match a gitignore-style pattern against a path.
Args:
path: Path to match
pattern: Pattern to match
ignorecase: Whether to do case-sensitive matching
Returns:
bool indicating whether the pattern matched
"""
return Pattern(pattern, ignorecase).match(path)
class Pattern(object):
"""A single ignore pattern."""
def __init__(self, pattern, ignorecase=False):
self.pattern = pattern
self.ignorecase = ignorecase
if pattern[0:1] == b'!':
self.is_exclude = False
pattern = pattern[1:]
else:
if pattern[0:1] == b'\\':
pattern = pattern[1:]
self.is_exclude = True
flags = 0
if self.ignorecase:
flags = re.IGNORECASE
self._re = re.compile(translate(pattern), flags)
def __bytes__(self):
return self.pattern
def __str__(self):
return self.pattern.decode(sys.getfilesystemencoding())
def __eq__(self, other):
return (type(self) == type(other) and
self.pattern == other.pattern and
self.ignorecase == other.ignorecase)
def __repr__(self):
return "%s(%s, %r)" % (
type(self).__name__, self.pattern, self.ignorecase)
def match(self, path):
"""Try to match a path against this ignore pattern.
Args:
path: Path to match (relative to ignore location)
Returns: boolean
"""
return bool(self._re.match(path))
class IgnoreFilter(object):
def __init__(self, patterns, ignorecase=False):
self._patterns = []
self._ignorecase = ignorecase
for pattern in patterns:
self.append_pattern(pattern)
def append_pattern(self, pattern):
"""Add a pattern to the set."""
self._patterns.append(Pattern(pattern, self._ignorecase))
def find_matching(self, path):
"""Yield all matching patterns for path.
- :param path: Path to match
- :return: Iterator over iterators
+ Args:
+ path: Path to match
+ Returns:
+ Iterator over iterators
"""
if not isinstance(path, bytes):
path = path.encode(sys.getfilesystemencoding())
for pattern in self._patterns:
if pattern.match(path):
yield pattern
def is_ignored(self, path):
"""Check whether a path is ignored.
For directories, include a trailing slash.
:return: status is None if file is not mentioned, True if it is
included, False if it is explicitly excluded.
"""
status = None
for pattern in self.find_matching(path):
status = pattern.is_exclude
return status
@classmethod
def from_path(cls, path, ignorecase=False):
with open(path, 'rb') as f:
ret = cls(read_ignore_patterns(f), ignorecase)
ret._path = path
return ret
def __repr__(self):
if getattr(self, '_path', None) is None:
return "<%s>" % (type(self).__name__)
else:
return "%s.from_path(%r)" % (type(self).__name__, self._path)
class IgnoreFilterStack(object):
"""Check for ignore status in multiple filters."""
def __init__(self, filters):
self._filters = filters
def is_ignored(self, path):
"""Check whether a path is explicitly included or excluded in ignores.
- :param path: Path to check
- :return: None if the file is not mentioned, True if it is included,
- False if it is explicitly excluded.
+ Args:
+ path: Path to check
+ Returns:
+ None if the file is not mentioned, True if it is included,
+ False if it is explicitly excluded.
"""
status = None
for filter in self._filters:
status = filter.is_ignored(path)
if status is not None:
return status
return status
def default_user_ignore_filter_path(config):
"""Return default user ignore filter path.
- :param config: A Config object
- :return: Path to a global ignore file
+ Args:
+ config: A Config object
+ Returns:
+ Path to a global ignore file
"""
try:
return config.get((b'core', ), b'excludesFile')
except KeyError:
pass
xdg_config_home = os.environ.get(
"XDG_CONFIG_HOME", os.path.expanduser("~/.config/"),
)
return os.path.join(xdg_config_home, 'git', 'ignore')
class IgnoreFilterManager(object):
"""Ignore file manager."""
def __init__(self, top_path, global_filters, ignorecase):
self._path_filters = {}
self._top_path = top_path
self._global_filters = global_filters
self._ignorecase = ignorecase
def __repr__(self):
return "%s(%s, %r, %r)" % (
type(self).__name__, self._top_path,
self._global_filters,
self._ignorecase)
def _load_path(self, path):
try:
return self._path_filters[path]
except KeyError:
pass
p = os.path.join(self._top_path, path, '.gitignore')
try:
self._path_filters[path] = IgnoreFilter.from_path(
p, self._ignorecase)
except IOError:
self._path_filters[path] = None
return self._path_filters[path]
def find_matching(self, path):
"""Find matching patterns for path.
Stops after the first ignore file with matches.
- :param path: Path to check
- :return: Iterator over Pattern instances
+ Args:
+ path: Path to check
+ Returns:
+ Iterator over Pattern instances
"""
if os.path.isabs(path):
raise ValueError('%s is an absolute path' % path)
filters = [(0, f) for f in self._global_filters]
if os.path.sep != '/':
path = path.replace(os.path.sep, '/')
parts = path.split('/')
for i in range(len(parts)+1):
dirname = '/'.join(parts[:i])
for s, f in filters:
relpath = '/'.join(parts[s:i])
if i < len(parts):
# Paths leading up to the final part are all directories,
# so need a trailing slash.
relpath += '/'
matches = list(f.find_matching(relpath))
if matches:
return iter(matches)
ignore_filter = self._load_path(dirname)
if ignore_filter is not None:
filters.insert(0, (i, ignore_filter))
return iter([])
def is_ignored(self, path):
"""Check whether a path is explicitly included or excluded in ignores.
- :param path: Path to check
- :return: None if the file is not mentioned, True if it is included,
- False if it is explicitly excluded.
+ Args:
+ path: Path to check
+ Returns:
+ None if the file is not mentioned, True if it is included,
+ False if it is explicitly excluded.
"""
matches = list(self.find_matching(path))
if matches:
return matches[-1].is_exclude
return None
@classmethod
def from_repo(cls, repo):
"""Create a IgnoreFilterManager from a repository.
- :param repo: Repository object
- :return: A `IgnoreFilterManager` object
+ Args:
+ repo: Repository object
+ Returns:
+ A `IgnoreFilterManager` object
"""
global_filters = []
for p in [
os.path.join(repo.controldir(), 'info', 'exclude'),
default_user_ignore_filter_path(repo.get_config_stack())]:
try:
global_filters.append(IgnoreFilter.from_path(p))
except IOError:
pass
config = repo.get_config_stack()
ignorecase = config.get_boolean((b'core'), (b'ignorecase'), False)
return cls(repo.path, global_filters, ignorecase)
diff --git a/dulwich/index.py b/dulwich/index.py
index bc61ce98..b938e410 100644
--- a/dulwich/index.py
+++ b/dulwich/index.py
@@ -1,809 +1,818 @@
# index.py -- File parser/writer for the git index file
# Copyright (C) 2008-2013 Jelmer Vernooij
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Parser for the git index file format."""
import collections
import errno
import os
import stat
import struct
import sys
from dulwich.file import GitFile
from dulwich.objects import (
Blob,
S_IFGITLINK,
S_ISGITLINK,
Tree,
hex_to_sha,
sha_to_hex,
)
from dulwich.pack import (
SHA1Reader,
SHA1Writer,
)
IndexEntry = collections.namedtuple(
'IndexEntry', [
'ctime', 'mtime', 'dev', 'ino', 'mode', 'uid', 'gid', 'size', 'sha',
'flags'])
FLAG_STAGEMASK = 0x3000
FLAG_VALID = 0x8000
FLAG_EXTENDED = 0x4000
def pathsplit(path):
"""Split a /-delimited path into a directory part and a basename.
- :param path: The path to split.
- :return: Tuple with directory name and basename
+ Args:
+ path: The path to split.
+ Returns:
+ Tuple with directory name and basename
"""
try:
(dirname, basename) = path.rsplit(b"/", 1)
except ValueError:
return (b"", path)
else:
return (dirname, basename)
def pathjoin(*args):
"""Join a /-delimited path.
"""
return b"/".join([p for p in args if p])
def read_cache_time(f):
"""Read a cache time.
- :param f: File-like object to read from
- :return: Tuple with seconds and nanoseconds
+ Args:
+ f: File-like object to read from
+ Returns:
+ Tuple with seconds and nanoseconds
"""
return struct.unpack(">LL", f.read(8))
def write_cache_time(f, t):
"""Write a cache time.
- :param f: File-like object to write to
- :param t: Time to write (as int, float or tuple with secs and nsecs)
+ Args:
+ f: File-like object to write to
+ t: Time to write (as int, float or tuple with secs and nsecs)
"""
if isinstance(t, int):
t = (t, 0)
elif isinstance(t, float):
(secs, nsecs) = divmod(t, 1.0)
t = (int(secs), int(nsecs * 1000000000))
elif not isinstance(t, tuple):
raise TypeError(t)
f.write(struct.pack(">LL", *t))
def read_cache_entry(f):
"""Read an entry from a cache file.
- :param f: File-like object to read from
- :return: tuple with: device, inode, mode, uid, gid, size, sha, flags
+ Args:
+ f: File-like object to read from
+ Returns:
+ tuple with: device, inode, mode, uid, gid, size, sha, flags
"""
beginoffset = f.tell()
ctime = read_cache_time(f)
mtime = read_cache_time(f)
(dev, ino, mode, uid, gid, size, sha, flags, ) = \
struct.unpack(">LLLLLL20sH", f.read(20 + 4 * 6 + 2))
name = f.read((flags & 0x0fff))
# Padding:
real_size = ((f.tell() - beginoffset + 8) & ~7)
f.read((beginoffset + real_size) - f.tell())
return (name, ctime, mtime, dev, ino, mode, uid, gid, size,
sha_to_hex(sha), flags & ~0x0fff)
def write_cache_entry(f, entry):
"""Write an index entry to a file.
- :param f: File object
- :param entry: Entry to write, tuple with:
+ Args:
+ f: File object
+ entry: Entry to write, tuple with:
(name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags)
"""
beginoffset = f.tell()
(name, ctime, mtime, dev, ino, mode, uid, gid, size, sha, flags) = entry
write_cache_time(f, ctime)
write_cache_time(f, mtime)
flags = len(name) | (flags & ~0x0fff)
f.write(struct.pack(
b'>LLLLLL20sH', dev & 0xFFFFFFFF, ino & 0xFFFFFFFF,
mode, uid, gid, size, hex_to_sha(sha), flags))
f.write(name)
real_size = ((f.tell() - beginoffset + 8) & ~7)
f.write(b'\0' * ((beginoffset + real_size) - f.tell()))
def read_index(f):
"""Read an index file, yielding the individual entries."""
header = f.read(4)
if header != b'DIRC':
raise AssertionError("Invalid index file header: %r" % header)
(version, num_entries) = struct.unpack(b'>LL', f.read(4 * 2))
assert version in (1, 2)
for i in range(num_entries):
yield read_cache_entry(f)
def read_index_dict(f):
"""Read an index file and return it as a dictionary.
- :param f: File object to read from
+ Args:
+ f: File object to read from
"""
ret = {}
for x in read_index(f):
ret[x[0]] = IndexEntry(*x[1:])
return ret
def write_index(f, entries):
"""Write an index file.
:param f: File-like object to write to
:param entries: Iterable over the entries to write
"""
f.write(b'DIRC')
f.write(struct.pack(b'>LL', 2, len(entries)))
for x in entries:
write_cache_entry(f, x)
def write_index_dict(f, entries):
"""Write an index file based on the contents of a dictionary.
"""
entries_list = []
for name in sorted(entries):
entries_list.append((name,) + tuple(entries[name]))
write_index(f, entries_list)
def cleanup_mode(mode):
"""Cleanup a mode value.
This will return a mode that can be stored in a tree object.
:param mode: Mode to clean up.
"""
if stat.S_ISLNK(mode):
return stat.S_IFLNK
elif stat.S_ISDIR(mode):
return stat.S_IFDIR
elif S_ISGITLINK(mode):
return S_IFGITLINK
ret = stat.S_IFREG | 0o644
ret |= (mode & 0o111)
return ret
class Index(object):
"""A Git Index file."""
def __init__(self, filename):
"""Open an index file.
:param filename: Path to the index file
"""
self._filename = filename
self.clear()
self.read()
@property
def path(self):
return self._filename
def __repr__(self):
return "%s(%r)" % (self.__class__.__name__, self._filename)
def write(self):
"""Write current contents of index to disk."""
f = GitFile(self._filename, 'wb')
try:
f = SHA1Writer(f)
write_index_dict(f, self._byname)
finally:
f.close()
def read(self):
"""Read current contents of index from disk."""
if not os.path.exists(self._filename):
return
f = GitFile(self._filename, 'rb')
try:
f = SHA1Reader(f)
for x in read_index(f):
self[x[0]] = IndexEntry(*x[1:])
# FIXME: Additional data?
f.read(os.path.getsize(self._filename)-f.tell()-20)
f.check_sha()
finally:
f.close()
def __len__(self):
"""Number of entries in this index file."""
return len(self._byname)
def __getitem__(self, name):
"""Retrieve entry by relative path.
:return: tuple with (ctime, mtime, dev, ino, mode, uid, gid, size, sha,
flags)
"""
return self._byname[name]
def __iter__(self):
"""Iterate over the paths in this index."""
return iter(self._byname)
def get_sha1(self, path):
"""Return the (git object) SHA1 for the object at a path."""
return self[path].sha
def get_mode(self, path):
"""Return the POSIX file mode for the object at a path."""
return self[path].mode
def iterobjects(self):
"""Iterate over path, sha, mode tuples for use with commit_tree."""
for path in self:
entry = self[path]
yield path, entry.sha, cleanup_mode(entry.mode)
def iterblobs(self):
import warnings
warnings.warn('Use iterobjects() instead.', PendingDeprecationWarning)
return self.iterobjects()
def clear(self):
"""Remove all contents from this index."""
self._byname = {}
def __setitem__(self, name, x):
assert isinstance(name, bytes)
assert len(x) == 10
# Remove the old entry if any
self._byname[name] = IndexEntry(*x)
def __delitem__(self, name):
assert isinstance(name, bytes)
del self._byname[name]
def iteritems(self):
return self._byname.items()
def items(self):
return self._byname.items()
def update(self, entries):
for name, value in entries.items():
self[name] = value
def changes_from_tree(self, object_store, tree, want_unchanged=False):
"""Find the differences between the contents of this index and a tree.
:param object_store: Object store to use for retrieving tree contents
:param tree: SHA1 of the root tree
:param want_unchanged: Whether unchanged files should be reported
:return: Iterator over tuples with (oldpath, newpath), (oldmode,
newmode), (oldsha, newsha)
"""
def lookup_entry(path):
entry = self[path]
return entry.sha, entry.mode
for (name, mode, sha) in changes_from_tree(
self._byname.keys(), lookup_entry, object_store, tree,
want_unchanged=want_unchanged):
yield (name, mode, sha)
def commit(self, object_store):
"""Create a new tree from an index.
:param object_store: Object store to save the tree in
:return: Root tree SHA
"""
return commit_tree(object_store, self.iterobjects())
def commit_tree(object_store, blobs):
"""Commit a new tree.
:param object_store: Object store to add trees to
:param blobs: Iterable over blob path, sha, mode entries
:return: SHA1 of the created tree.
"""
trees = {b'': {}}
def add_tree(path):
if path in trees:
return trees[path]
dirname, basename = pathsplit(path)
t = add_tree(dirname)
assert isinstance(basename, bytes)
newtree = {}
t[basename] = newtree
trees[path] = newtree
return newtree
for path, sha, mode in blobs:
tree_path, basename = pathsplit(path)
tree = add_tree(tree_path)
tree[basename] = (mode, sha)
def build_tree(path):
tree = Tree()
for basename, entry in trees[path].items():
if isinstance(entry, dict):
mode = stat.S_IFDIR
sha = build_tree(pathjoin(path, basename))
else:
(mode, sha) = entry
tree.add(basename, mode, sha)
object_store.add_object(tree)
return tree.id
return build_tree(b'')
def commit_index(object_store, index):
"""Create a new tree from an index.
:param object_store: Object store to save the tree in
:param index: Index file
:note: This function is deprecated, use index.commit() instead.
:return: Root tree sha.
"""
return commit_tree(object_store, index.iterobjects())
def changes_from_tree(names, lookup_entry, object_store, tree,
want_unchanged=False):
"""Find the differences between the contents of a tree and
a working copy.
:param names: Iterable of names in the working copy
:param lookup_entry: Function to lookup an entry in the working copy
:param object_store: Object store to use for retrieving tree contents
:param tree: SHA1 of the root tree, or None for an empty tree
:param want_unchanged: Whether unchanged files should be reported
:return: Iterator over tuples with (oldpath, newpath), (oldmode, newmode),
(oldsha, newsha)
"""
# TODO(jelmer): Support a include_trees option
other_names = set(names)
if tree is not None:
for (name, mode, sha) in object_store.iter_tree_contents(tree):
try:
(other_sha, other_mode) = lookup_entry(name)
except KeyError:
# Was removed
yield ((name, None), (mode, None), (sha, None))
else:
other_names.remove(name)
if (want_unchanged or other_sha != sha or other_mode != mode):
yield ((name, name), (mode, other_mode), (sha, other_sha))
# Mention added files
for name in other_names:
try:
(other_sha, other_mode) = lookup_entry(name)
except KeyError:
pass
else:
yield ((None, name), (None, other_mode), (None, other_sha))
def index_entry_from_stat(stat_val, hex_sha, flags, mode=None):
"""Create a new index entry from a stat value.
:param stat_val: POSIX stat_result instance
:param hex_sha: Hex sha of the object
:param flags: Index flags
"""
if mode is None:
mode = cleanup_mode(stat_val.st_mode)
return IndexEntry(
stat_val.st_ctime, stat_val.st_mtime, stat_val.st_dev,
stat_val.st_ino, mode, stat_val.st_uid,
stat_val.st_gid, stat_val.st_size, hex_sha, flags)
def build_file_from_blob(blob, mode, target_path, honor_filemode=True):
"""Build a file or symlink on disk based on a Git object.
:param obj: The git object
:param mode: File mode
:param target_path: Path to write to
:param honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
:return: stat object for the file
"""
try:
oldstat = os.lstat(target_path)
except OSError as e:
if e.errno == errno.ENOENT:
oldstat = None
else:
raise
contents = blob.as_raw_string()
if stat.S_ISLNK(mode):
# FIXME: This will fail on Windows. What should we do instead?
if oldstat:
os.unlink(target_path)
if sys.platform == 'win32' and sys.version_info[0] == 3:
# os.readlink on Python3 on Windows requires a unicode string.
# TODO(jelmer): Don't assume tree_encoding == fs_encoding
tree_encoding = sys.getfilesystemencoding()
contents = contents.decode(tree_encoding)
target_path = target_path.decode(tree_encoding)
os.symlink(contents, target_path)
else:
if oldstat is not None and oldstat.st_size == len(contents):
with open(target_path, 'rb') as f:
if f.read() == contents:
return oldstat
with open(target_path, 'wb') as f:
# Write out file
f.write(contents)
if honor_filemode:
os.chmod(target_path, mode)
return os.lstat(target_path)
INVALID_DOTNAMES = (b".git", b".", b"..", b"")
def validate_path_element_default(element):
return element.lower() not in INVALID_DOTNAMES
def validate_path_element_ntfs(element):
stripped = element.rstrip(b". ").lower()
if stripped in INVALID_DOTNAMES:
return False
if stripped == b"git~1":
return False
return True
def validate_path(path, element_validator=validate_path_element_default):
"""Default path validator that just checks for .git/."""
parts = path.split(b"/")
for p in parts:
if not element_validator(p):
return False
else:
return True
def build_index_from_tree(root_path, index_path, object_store, tree_id,
honor_filemode=True,
validate_path_element=validate_path_element_default):
"""Generate and materialize index from a tree
:param tree_id: Tree to materialize
:param root_path: Target dir for materialized index files
:param index_path: Target path for generated index
:param object_store: Non-empty object store holding tree contents
:param honor_filemode: An optional flag to honor core.filemode setting in
config file, default is core.filemode=True, change executable bit
:param validate_path_element: Function to validate path elements to check
out; default just refuses .git and .. directories.
:note:: existing index is wiped and contents are not merged
in a working dir. Suitable only for fresh clones.
"""
index = Index(index_path)
if not isinstance(root_path, bytes):
root_path = root_path.encode(sys.getfilesystemencoding())
for entry in object_store.iter_tree_contents(tree_id):
if not validate_path(entry.path, validate_path_element):
continue
full_path = _tree_to_fs_path(root_path, entry.path)
if not os.path.exists(os.path.dirname(full_path)):
os.makedirs(os.path.dirname(full_path))
# TODO(jelmer): Merge new index into working tree
if S_ISGITLINK(entry.mode):
if not os.path.isdir(full_path):
os.mkdir(full_path)
st = os.lstat(full_path)
# TODO(jelmer): record and return submodule paths
else:
obj = object_store[entry.sha]
st = build_file_from_blob(
obj, entry.mode, full_path, honor_filemode=honor_filemode)
# Add file to index
if not honor_filemode or S_ISGITLINK(entry.mode):
# we can not use tuple slicing to build a new tuple,
# because on windows that will convert the times to
# longs, which causes errors further along
st_tuple = (entry.mode, st.st_ino, st.st_dev, st.st_nlink,
st.st_uid, st.st_gid, st.st_size, st.st_atime,
st.st_mtime, st.st_ctime)
st = st.__class__(st_tuple)
index[entry.path] = index_entry_from_stat(st, entry.sha, 0)
index.write()
def blob_from_path_and_stat(fs_path, st):
"""Create a blob from a path and a stat object.
:param fs_path: Full file system path to file
:param st: A stat object
:return: A `Blob` object
"""
assert isinstance(fs_path, bytes)
blob = Blob()
if not stat.S_ISLNK(st.st_mode):
with open(fs_path, 'rb') as f:
blob.data = f.read()
else:
if sys.platform == 'win32' and sys.version_info[0] == 3:
# os.readlink on Python3 on Windows requires a unicode string.
# TODO(jelmer): Don't assume tree_encoding == fs_encoding
tree_encoding = sys.getfilesystemencoding()
fs_path = fs_path.decode(tree_encoding)
blob.data = os.readlink(fs_path).encode(tree_encoding)
else:
blob.data = os.readlink(fs_path)
return blob
def read_submodule_head(path):
"""Read the head commit of a submodule.
:param path: path to the submodule
:return: HEAD sha, None if not a valid head/repository
"""
from dulwich.errors import NotGitRepository
from dulwich.repo import Repo
# Repo currently expects a "str", so decode if necessary.
# TODO(jelmer): Perhaps move this into Repo() ?
if not isinstance(path, str):
path = path.decode(sys.getfilesystemencoding())
try:
repo = Repo(path)
except NotGitRepository:
return None
try:
return repo.head()
except KeyError:
return None
def _has_directory_changed(tree_path, entry):
"""Check if a directory has changed after getting an error.
When handling an error trying to create a blob from a path, call this
function. It will check if the path is a directory. If it's a directory
and a submodule, check the submodule head to see if it's has changed. If
not, consider the file as changed as Git tracked a file and not a
directory.
Return true if the given path should be considered as changed and False
otherwise or if the path is not a directory.
"""
# This is actually a directory
if os.path.exists(os.path.join(tree_path, b'.git')):
# Submodule
head = read_submodule_head(tree_path)
if entry.sha != head:
return True
else:
# The file was changed to a directory, so consider it removed.
return True
return False
def get_unstaged_changes(index, root_path, filter_blob_callback=None):
"""Walk through an index and check for differences against working tree.
:param index: index to check
:param root_path: path in which to find files
:return: iterator over paths with unstaged changes
"""
# For each entry in the index check the sha1 & ensure not staged
if not isinstance(root_path, bytes):
root_path = root_path.encode(sys.getfilesystemencoding())
for tree_path, entry in index.iteritems():
full_path = _tree_to_fs_path(root_path, tree_path)
try:
st = os.lstat(full_path)
if stat.S_ISDIR(st.st_mode):
if _has_directory_changed(tree_path, entry):
yield tree_path
continue
blob = blob_from_path_and_stat(full_path, st)
if filter_blob_callback is not None:
blob = filter_blob_callback(blob, tree_path)
except EnvironmentError as e:
if e.errno == errno.ENOENT:
# The file was removed, so we assume that counts as
# different from whatever file used to exist.
yield tree_path
else:
raise
else:
if blob.id != entry.sha:
yield tree_path
os_sep_bytes = os.sep.encode('ascii')
def _tree_to_fs_path(root_path, tree_path):
"""Convert a git tree path to a file system path.
:param root_path: Root filesystem path
:param tree_path: Git tree path as bytes
:return: File system path.
"""
assert isinstance(tree_path, bytes)
if os_sep_bytes != b'/':
sep_corrected_path = tree_path.replace(b'/', os_sep_bytes)
else:
sep_corrected_path = tree_path
return os.path.join(root_path, sep_corrected_path)
def _fs_to_tree_path(fs_path, fs_encoding=None):
"""Convert a file system path to a git tree path.
:param fs_path: File system path.
:param fs_encoding: File system encoding
:return: Git tree path as bytes
"""
if fs_encoding is None:
fs_encoding = sys.getfilesystemencoding()
if not isinstance(fs_path, bytes):
fs_path_bytes = fs_path.encode(fs_encoding)
else:
fs_path_bytes = fs_path
if os_sep_bytes != b'/':
tree_path = fs_path_bytes.replace(os_sep_bytes, b'/')
else:
tree_path = fs_path_bytes
return tree_path
def index_entry_from_path(path, object_store=None):
"""Create an index from a filesystem path.
This returns an index value for files, symlinks
and tree references. for directories and
non-existant files it returns None
:param path: Path to create an index entry for
:param object_store: Optional object store to
save new blobs in
:return: An index entry; None for directories
"""
assert isinstance(path, bytes)
st = os.lstat(path)
if stat.S_ISDIR(st.st_mode):
if os.path.exists(os.path.join(path, b'.git')):
head = read_submodule_head(path)
if head is None:
return None
return index_entry_from_stat(
st, head, 0, mode=S_IFGITLINK)
return None
blob = blob_from_path_and_stat(path, st)
if object_store is not None:
object_store.add_object(blob)
return index_entry_from_stat(st, blob.id, 0)
def iter_fresh_entries(paths, root_path, object_store=None):
"""Iterate over current versions of index entries on disk.
:param paths: Paths to iterate over
:param root_path: Root path to access from
:param store: Optional store to save new blobs in
:return: Iterator over path, index_entry
"""
for path in paths:
p = _tree_to_fs_path(root_path, path)
try:
entry = index_entry_from_path(p, object_store=object_store)
except EnvironmentError as e:
if e.errno in (errno.ENOENT, errno.EISDIR):
entry = None
else:
raise
yield path, entry
def iter_fresh_blobs(index, root_path):
"""Iterate over versions of blobs on disk referenced by index.
Don't use this function; it removes missing entries from index.
:param index: Index file
:param root_path: Root path to access from
:param include_deleted: Include deleted entries with sha and
mode set to None
:return: Iterator over path, sha, mode
"""
import warnings
warnings.warn(PendingDeprecationWarning,
"Use iter_fresh_objects instead.")
for entry in iter_fresh_objects(
index, root_path, include_deleted=True):
if entry[1] is None:
del index[entry[0]]
else:
yield entry
def iter_fresh_objects(paths, root_path, include_deleted=False,
object_store=None):
"""Iterate over versions of objecs on disk referenced by index.
:param index: Index file
:param root_path: Root path to access from
:param include_deleted: Include deleted entries with sha and
mode set to None
:param object_store: Optional object store to report new items to
:return: Iterator over path, sha, mode
"""
for path, entry in iter_fresh_entries(paths, root_path,
object_store=object_store):
if entry is None:
if include_deleted:
yield path, None, None
else:
entry = IndexEntry(*entry)
yield path, entry.sha, cleanup_mode(entry.mode)
def refresh_index(index, root_path):
"""Refresh the contents of an index.
This is the equivalent to running 'git commit -a'.
:param index: Index to update
:param root_path: Root filesystem path
"""
for path, entry in iter_fresh_entries(index, root_path):
index[path] = path
diff --git a/dulwich/porcelain.py b/dulwich/porcelain.py
index cf5b2169..210c74fb 100644
--- a/dulwich/porcelain.py
+++ b/dulwich/porcelain.py
@@ -1,1528 +1,1532 @@
# porcelain.py -- Porcelain-like layer on top of Dulwich
# Copyright (C) 2013 Jelmer Vernooij
#
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Simple wrapper that provides porcelain-like functions on top of Dulwich.
Currently implemented:
* archive
* add
* branch{_create,_delete,_list}
* check-ignore
* checkout
* clone
* commit
* commit-tree
* daemon
* describe
* diff-tree
* fetch
* init
* ls-files
* ls-remote
* ls-tree
* pull
* push
* rm
* remote{_add}
* receive-pack
* reset
* rev-list
* tag{_create,_delete,_list}
* upload-pack
* update-server-info
* status
* symbolic-ref
These functions are meant to behave similarly to the git subcommands.
Differences in behaviour are considered bugs.
Functions should generally accept both unicode strings and bytestrings
"""
from collections import namedtuple
from contextlib import (
closing,
contextmanager,
)
from io import BytesIO, RawIOBase
import datetime
import os
import posixpath
import stat
import sys
import time
from dulwich.archive import (
tar_stream,
)
from dulwich.client import (
get_transport_and_path,
)
from dulwich.config import (
StackedConfig,
)
from dulwich.diff_tree import (
CHANGE_ADD,
CHANGE_DELETE,
CHANGE_MODIFY,
CHANGE_RENAME,
CHANGE_COPY,
RENAME_CHANGE_TYPES,
)
from dulwich.errors import (
SendPackError,
UpdateRefsError,
)
from dulwich.ignore import IgnoreFilterManager
from dulwich.index import (
blob_from_path_and_stat,
get_unstaged_changes,
)
from dulwich.object_store import (
tree_lookup_path,
)
from dulwich.objects import (
Commit,
Tag,
format_timezone,
parse_timezone,
pretty_format_tree_entry,
)
from dulwich.objectspec import (
parse_commit,
parse_object,
parse_ref,
parse_reftuples,
parse_tree,
)
from dulwich.pack import (
write_pack_index,
write_pack_objects,
)
from dulwich.patch import write_tree_diff
from dulwich.protocol import (
Protocol,
ZERO_SHA,
)
from dulwich.refs import (
ANNOTATED_TAG_SUFFIX,
LOCAL_BRANCH_PREFIX,
strip_peeled_refs,
)
from dulwich.repo import (BaseRepo, Repo)
from dulwich.server import (
FileSystemBackend,
TCPGitServer,
ReceivePackHandler,
UploadPackHandler,
update_server_info as server_update_server_info,
)
# Module level tuple definition for status output
GitStatus = namedtuple('GitStatus', 'staged unstaged untracked')
class NoneStream(RawIOBase):
"""Fallback if stdout or stderr are unavailable, does nothing."""
def read(self, size=-1):
return None
def readall(self):
return None
def readinto(self, b):
return None
def write(self, b):
return None
if sys.version_info[0] == 2:
default_bytes_out_stream = sys.stdout or NoneStream()
default_bytes_err_stream = sys.stderr or NoneStream()
else:
default_bytes_out_stream = (
getattr(sys.stdout, 'buffer', None) or NoneStream())
default_bytes_err_stream = (
getattr(sys.stderr, 'buffer', None) or NoneStream())
DEFAULT_ENCODING = 'utf-8'
class RemoteExists(Exception):
"""Raised when the remote already exists."""
def open_repo(path_or_repo):
"""Open an argument that can be a repository or a path for a repository."""
if isinstance(path_or_repo, BaseRepo):
return path_or_repo
return Repo(path_or_repo)
@contextmanager
def _noop_context_manager(obj):
"""Context manager that has the same api as closing but does nothing."""
yield obj
def open_repo_closing(path_or_repo):
"""Open an argument that can be a repository or a path for a repository.
returns a context manager that will close the repo on exit if the argument
is a path, else does nothing if the argument is a repo.
"""
if isinstance(path_or_repo, BaseRepo):
return _noop_context_manager(path_or_repo)
return closing(Repo(path_or_repo))
def path_to_tree_path(repopath, path):
"""Convert a path to a path usable in an index, e.g. bytes and relative to
the repository root.
:param repopath: Repository path, absolute or relative to the cwd
:param path: A path, absolute or relative to the cwd
:return: A path formatted for use in e.g. an index
"""
if not isinstance(path, bytes):
path = path.encode(sys.getfilesystemencoding())
if not isinstance(repopath, bytes):
repopath = repopath.encode(sys.getfilesystemencoding())
treepath = os.path.relpath(path, repopath)
if treepath.startswith(b'..'):
raise ValueError('Path not in repo')
if os.path.sep != '/':
treepath = treepath.replace(os.path.sep.encode('ascii'), b'/')
return treepath
def archive(repo, committish=None, outstream=default_bytes_out_stream,
errstream=default_bytes_err_stream):
"""Create an archive.
:param repo: Path of repository for which to generate an archive.
:param committish: Commit SHA1 or ref to use
:param outstream: Output stream (defaults to stdout)
:param errstream: Error stream (defaults to stderr)
"""
if committish is None:
committish = "HEAD"
with open_repo_closing(repo) as repo_obj:
c = parse_commit(repo_obj, committish)
for chunk in tar_stream(
repo_obj.object_store, repo_obj.object_store[c.tree],
c.commit_time):
outstream.write(chunk)
def update_server_info(repo="."):
"""Update server info files for a repository.
:param repo: path to the repository
"""
with open_repo_closing(repo) as r:
server_update_server_info(r)
def symbolic_ref(repo, ref_name, force=False):
"""Set git symbolic ref into HEAD.
:param repo: path to the repository
:param ref_name: short name of the new ref
:param force: force settings without checking if it exists in refs/heads
"""
with open_repo_closing(repo) as repo_obj:
ref_path = _make_branch_ref(ref_name)
if not force and ref_path not in repo_obj.refs.keys():
raise ValueError('fatal: ref `%s` is not a ref' % ref_name)
repo_obj.refs.set_symbolic_ref(b'HEAD', ref_path)
def commit(repo=".", message=None, author=None, committer=None, encoding=None):
"""Create a new commit.
:param repo: Path to repository
:param message: Optional commit message
:param author: Optional author name and email
:param committer: Optional committer name and email
:return: SHA1 of the new commit
"""
# FIXME: Support --all argument
# FIXME: Support --signoff argument
if getattr(message, 'encode', None):
message = message.encode(encoding or DEFAULT_ENCODING)
if getattr(author, 'encode', None):
author = author.encode(encoding or DEFAULT_ENCODING)
if getattr(committer, 'encode', None):
committer = committer.encode(encoding or DEFAULT_ENCODING)
with open_repo_closing(repo) as r:
return r.do_commit(
message=message, author=author, committer=committer,
encoding=encoding)
def commit_tree(repo, tree, message=None, author=None, committer=None):
"""Create a new commit object.
:param repo: Path to repository
:param tree: An existing tree object
:param author: Optional author name and email
:param committer: Optional committer name and email
"""
with open_repo_closing(repo) as r:
return r.do_commit(
message=message, tree=tree, committer=committer, author=author)
def init(path=".", bare=False):
"""Create a new git repository.
:param path: Path to repository.
:param bare: Whether to create a bare repository.
:return: A Repo instance
"""
if not os.path.exists(path):
os.mkdir(path)
if bare:
return Repo.init_bare(path)
else:
return Repo.init(path)
def clone(source, target=None, bare=False, checkout=None,
errstream=default_bytes_err_stream, outstream=None,
origin=b"origin", depth=None, **kwargs):
"""Clone a local or remote git repository.
:param source: Path or URL for source repository
:param target: Path to target repository (optional)
:param bare: Whether or not to create a bare repository
:param checkout: Whether or not to check-out HEAD after cloning
:param errstream: Optional stream to write progress to
:param outstream: Optional stream to write progress to (deprecated)
:param origin: Name of remote from the repository used to clone
:param depth: Depth to fetch at
:return: The new repository
"""
# TODO(jelmer): This code overlaps quite a bit with Repo.clone
if outstream is not None:
import warnings
warnings.warn(
"outstream= has been deprecated in favour of errstream=.",
DeprecationWarning, stacklevel=3)
errstream = outstream
if checkout is None:
checkout = (not bare)
if checkout and bare:
raise ValueError("checkout and bare are incompatible")
if target is None:
target = source.split("/")[-1]
if not os.path.exists(target):
os.mkdir(target)
if bare:
r = Repo.init_bare(target)
else:
r = Repo.init(target)
reflog_message = b'clone: from ' + source.encode('utf-8')
try:
fetch_result = fetch(
r, source, origin, errstream=errstream, message=reflog_message,
depth=depth, **kwargs)
target_config = r.get_config()
if not isinstance(source, bytes):
source = source.encode(DEFAULT_ENCODING)
target_config.set((b'remote', origin), b'url', source)
target_config.set(
(b'remote', origin), b'fetch',
b'+refs/heads/*:refs/remotes/' + origin + b'/*')
target_config.write_to_path()
# TODO(jelmer): Support symref capability,
# https://github.com/jelmer/dulwich/issues/485
try:
head = r[fetch_result[b'HEAD']]
except KeyError:
head = None
else:
r[b'HEAD'] = head.id
if checkout and not bare and head is not None:
errstream.write(b'Checking out ' + head.id + b'\n')
r.reset_index(head.tree)
except BaseException:
r.close()
raise
return r
def add(repo=".", paths=None):
"""Add files to the staging area.
:param repo: Repository for the files
:param paths: Paths to add. No value passed stages all modified files.
:return: Tuple with set of added files and ignored files
"""
ignored = set()
with open_repo_closing(repo) as r:
ignore_manager = IgnoreFilterManager.from_repo(r)
if not paths:
paths = list(
get_untracked_paths(os.getcwd(), r.path, r.open_index()))
relpaths = []
if not isinstance(paths, list):
paths = [paths]
for p in paths:
relpath = os.path.relpath(p, r.path)
if relpath.startswith('..' + os.path.sep):
raise ValueError('path %r is not in repo' % relpath)
# FIXME: Support patterns, directories.
if ignore_manager.is_ignored(relpath):
ignored.add(relpath)
continue
relpaths.append(relpath)
r.stage(relpaths)
return (relpaths, ignored)
def _is_subdir(subdir, parentdir):
"""Check whether subdir is parentdir or a subdir of parentdir
If parentdir or subdir is a relative path, it will be disamgibuated
relative to the pwd.
"""
parentdir_abs = os.path.realpath(parentdir) + os.path.sep
subdir_abs = os.path.realpath(subdir) + os.path.sep
return subdir_abs.startswith(parentdir_abs)
# TODO: option to remove ignored files also, in line with `git clean -fdx`
def clean(repo=".", target_dir=None):
"""Remove any untracked files from the target directory recursively
Equivalent to running `git clean -fd` in target_dir.
:param repo: Repository where the files may be tracked
:param target_dir: Directory to clean - current directory if None
"""
if target_dir is None:
target_dir = os.getcwd()
with open_repo_closing(repo) as r:
if not _is_subdir(target_dir, r.path):
raise ValueError("target_dir must be in the repo's working dir")
index = r.open_index()
ignore_manager = IgnoreFilterManager.from_repo(r)
paths_in_wd = _walk_working_dir_paths(target_dir, r.path)
# Reverse file visit order, so that files and subdirectories are
# removed before containing directory
for ap, is_dir in reversed(list(paths_in_wd)):
if is_dir:
# All subdirectories and files have been removed if untracked,
# so dir contains no tracked files iff it is empty.
is_empty = len(os.listdir(ap)) == 0
if is_empty:
os.rmdir(ap)
else:
ip = path_to_tree_path(r.path, ap)
is_tracked = ip in index
rp = os.path.relpath(ap, r.path)
is_ignored = ignore_manager.is_ignored(rp)
if not is_tracked and not is_ignored:
os.remove(ap)
def remove(repo=".", paths=None, cached=False):
"""Remove files from the staging area.
:param repo: Repository for the files
:param paths: Paths to remove
"""
with open_repo_closing(repo) as r:
index = r.open_index()
for p in paths:
full_path = os.path.abspath(p).encode(sys.getfilesystemencoding())
tree_path = path_to_tree_path(r.path, p)
try:
index_sha = index[tree_path].sha
except KeyError:
raise Exception('%s did not match any files' % p)
if not cached:
try:
st = os.lstat(full_path)
except OSError:
pass
else:
try:
blob = blob_from_path_and_stat(full_path, st)
except IOError:
pass
else:
try:
committed_sha = tree_lookup_path(
r.__getitem__, r[r.head()].tree, tree_path)[1]
except KeyError:
committed_sha = None
if blob.id != index_sha and index_sha != committed_sha:
raise Exception(
'file has staged content differing '
'from both the file and head: %s' % p)
if index_sha != committed_sha:
raise Exception(
'file has staged changes: %s' % p)
os.remove(full_path)
del index[tree_path]
index.write()
rm = remove
def commit_decode(commit, contents, default_encoding=DEFAULT_ENCODING):
if commit.encoding is not None:
return contents.decode(commit.encoding, "replace")
return contents.decode(default_encoding, "replace")
def print_commit(commit, decode, outstream=sys.stdout):
"""Write a human-readable commit log entry.
:param commit: A `Commit` object
:param outstream: A stream file to write to
"""
outstream.write("-" * 50 + "\n")
outstream.write("commit: " + commit.id.decode('ascii') + "\n")
if len(commit.parents) > 1:
outstream.write(
"merge: " +
"...".join([c.decode('ascii') for c in commit.parents[1:]]) + "\n")
outstream.write("Author: " + decode(commit.author) + "\n")
if commit.author != commit.committer:
outstream.write("Committer: " + decode(commit.committer) + "\n")
time_tuple = time.gmtime(commit.author_time + commit.author_timezone)
time_str = time.strftime("%a %b %d %Y %H:%M:%S", time_tuple)
timezone_str = format_timezone(commit.author_timezone).decode('ascii')
outstream.write("Date: " + time_str + " " + timezone_str + "\n")
outstream.write("\n")
outstream.write(decode(commit.message) + "\n")
outstream.write("\n")
def print_tag(tag, decode, outstream=sys.stdout):
"""Write a human-readable tag.
:param tag: A `Tag` object
:param decode: Function for decoding bytes to unicode string
:param outstream: A stream to write to
"""
outstream.write("Tagger: " + decode(tag.tagger) + "\n")
time_tuple = time.gmtime(tag.tag_time + tag.tag_timezone)
time_str = time.strftime("%a %b %d %Y %H:%M:%S", time_tuple)
timezone_str = format_timezone(tag.tag_timezone).decode('ascii')
outstream.write("Date: " + time_str + " " + timezone_str + "\n")
outstream.write("\n")
outstream.write(decode(tag.message) + "\n")
outstream.write("\n")
def show_blob(repo, blob, decode, outstream=sys.stdout):
"""Write a blob to a stream.
:param repo: A `Repo` object
:param blob: A `Blob` object
:param decode: Function for decoding bytes to unicode string
:param outstream: A stream file to write to
"""
outstream.write(decode(blob.data))
def show_commit(repo, commit, decode, outstream=sys.stdout):
"""Show a commit to a stream.
:param repo: A `Repo` object
:param commit: A `Commit` object
:param decode: Function for decoding bytes to unicode string
:param outstream: Stream to write to
"""
print_commit(commit, decode=decode, outstream=outstream)
if commit.parents:
parent_commit = repo[commit.parents[0]]
base_tree = parent_commit.tree
else:
base_tree = None
diffstream = BytesIO()
write_tree_diff(
diffstream,
repo.object_store, base_tree, commit.tree)
diffstream.seek(0)
outstream.write(
diffstream.getvalue().decode(
commit.encoding or DEFAULT_ENCODING, 'replace'))
def show_tree(repo, tree, decode, outstream=sys.stdout):
"""Print a tree to a stream.
:param repo: A `Repo` object
:param tree: A `Tree` object
:param decode: Function for decoding bytes to unicode string
:param outstream: Stream to write to
"""
for n in tree:
outstream.write(decode(n) + "\n")
def show_tag(repo, tag, decode, outstream=sys.stdout):
"""Print a tag to a stream.
:param repo: A `Repo` object
:param tag: A `Tag` object
:param decode: Function for decoding bytes to unicode string
:param outstream: Stream to write to
"""
print_tag(tag, decode, outstream)
show_object(repo, repo[tag.object[1]], decode, outstream)
def show_object(repo, obj, decode, outstream):
return {
b"tree": show_tree,
b"blob": show_blob,
b"commit": show_commit,
b"tag": show_tag,
}[obj.type_name](repo, obj, decode, outstream)
def print_name_status(changes):
"""Print a simple status summary, listing changed files.
"""
for change in changes:
if not change:
continue
if isinstance(change, list):
change = change[0]
if change.type == CHANGE_ADD:
path1 = change.new.path
path2 = ''
kind = 'A'
elif change.type == CHANGE_DELETE:
path1 = change.old.path
path2 = ''
kind = 'D'
elif change.type == CHANGE_MODIFY:
path1 = change.new.path
path2 = ''
kind = 'M'
elif change.type in RENAME_CHANGE_TYPES:
path1 = change.old.path
path2 = change.new.path
if change.type == CHANGE_RENAME:
kind = 'R'
elif change.type == CHANGE_COPY:
kind = 'C'
yield '%-8s%-20s%-20s' % (kind, path1, path2)
def log(repo=".", paths=None, outstream=sys.stdout, max_entries=None,
reverse=False, name_status=False):
"""Write commit logs.
:param repo: Path to repository
:param paths: Optional set of specific paths to print entries for
:param outstream: Stream to write log output to
:param reverse: Reverse order in which entries are printed
:param name_status: Print name status
:param max_entries: Optional maximum number of entries to display
"""
with open_repo_closing(repo) as r:
walker = r.get_walker(
max_entries=max_entries, paths=paths, reverse=reverse)
for entry in walker:
def decode(x):
return commit_decode(entry.commit, x)
print_commit(entry.commit, decode, outstream)
if name_status:
outstream.writelines(
[l+'\n' for l in print_name_status(entry.changes())])
# TODO(jelmer): better default for encoding?
def show(repo=".", objects=None, outstream=sys.stdout,
default_encoding=DEFAULT_ENCODING):
"""Print the changes in a commit.
:param repo: Path to repository
:param objects: Objects to show (defaults to [HEAD])
:param outstream: Stream to write to
:param default_encoding: Default encoding to use if none is set in the
commit
"""
if objects is None:
objects = ["HEAD"]
if not isinstance(objects, list):
objects = [objects]
with open_repo_closing(repo) as r:
for objectish in objects:
o = parse_object(r, objectish)
if isinstance(o, Commit):
def decode(x):
return commit_decode(o, x, default_encoding)
else:
def decode(x):
return x.decode(default_encoding)
show_object(r, o, decode, outstream)
def diff_tree(repo, old_tree, new_tree, outstream=sys.stdout):
"""Compares the content and mode of blobs found via two tree objects.
:param repo: Path to repository
:param old_tree: Id of old tree
:param new_tree: Id of new tree
:param outstream: Stream to write to
"""
with open_repo_closing(repo) as r:
write_tree_diff(outstream, r.object_store, old_tree, new_tree)
def rev_list(repo, commits, outstream=sys.stdout):
"""Lists commit objects in reverse chronological order.
:param repo: Path to repository
:param commits: Commits over which to iterate
:param outstream: Stream to write to
"""
with open_repo_closing(repo) as r:
for entry in r.get_walker(include=[r[c].id for c in commits]):
outstream.write(entry.commit.id + b"\n")
def tag(*args, **kwargs):
import warnings
warnings.warn("tag has been deprecated in favour of tag_create.",
DeprecationWarning)
return tag_create(*args, **kwargs)
def tag_create(
repo, tag, author=None, message=None, annotated=False,
objectish="HEAD", tag_time=None, tag_timezone=None,
sign=False):
"""Creates a tag in git via dulwich calls:
:param repo: Path to repository
:param tag: tag string
:param author: tag author (optional, if annotated is set)
:param message: tag message (optional)
:param annotated: whether to create an annotated tag
:param objectish: object the tag should point at, defaults to HEAD
:param tag_time: Optional time for annotated tag
:param tag_timezone: Optional timezone for annotated tag
:param sign: GPG Sign the tag
"""
with open_repo_closing(repo) as r:
object = parse_object(r, objectish)
if annotated:
# Create the tag object
tag_obj = Tag()
if author is None:
# TODO(jelmer): Don't use repo private method.
author = r._get_user_identity(r.get_config_stack())
tag_obj.tagger = author
tag_obj.message = message
tag_obj.name = tag
tag_obj.object = (type(object), object.id)
if tag_time is None:
tag_time = int(time.time())
tag_obj.tag_time = tag_time
if tag_timezone is None:
# TODO(jelmer) Use current user timezone rather than UTC
tag_timezone = 0
elif isinstance(tag_timezone, str):
tag_timezone = parse_timezone(tag_timezone)
tag_obj.tag_timezone = tag_timezone
if sign:
import gpg
with gpg.Context(armor=True) as c:
tag_obj.signature, unused_result = c.sign(
tag_obj.as_raw_string())
r.object_store.add_object(tag_obj)
tag_id = tag_obj.id
else:
tag_id = object.id
r.refs[_make_tag_ref(tag)] = tag_id
def list_tags(*args, **kwargs):
import warnings
warnings.warn("list_tags has been deprecated in favour of tag_list.",
DeprecationWarning)
return tag_list(*args, **kwargs)
def tag_list(repo, outstream=sys.stdout):
"""List all tags.
:param repo: Path to repository
:param outstream: Stream to write tags to
"""
with open_repo_closing(repo) as r:
tags = sorted(r.refs.as_dict(b"refs/tags"))
return tags
def tag_delete(repo, name):
"""Remove a tag.
:param repo: Path to repository
:param name: Name of tag to remove
"""
with open_repo_closing(repo) as r:
if isinstance(name, bytes):
names = [name]
elif isinstance(name, list):
names = name
else:
raise TypeError("Unexpected tag name type %r" % name)
for name in names:
del r.refs[_make_tag_ref(name)]
def reset(repo, mode, treeish="HEAD"):
"""Reset current HEAD to the specified state.
:param repo: Path to repository
:param mode: Mode ("hard", "soft", "mixed")
:param treeish: Treeish to reset to
"""
if mode != "hard":
raise ValueError("hard is the only mode currently supported")
with open_repo_closing(repo) as r:
tree = parse_tree(r, treeish)
r.reset_index(tree.id)
def push(repo, remote_location, refspecs,
outstream=default_bytes_out_stream,
errstream=default_bytes_err_stream, **kwargs):
"""Remote push with dulwich via dulwich.client
:param repo: Path to repository
:param remote_location: Location of the remote
:param refspecs: Refs to push to remote
:param outstream: A stream file to write output
:param errstream: A stream file to write errors
"""
# Open the repo
with open_repo_closing(repo) as r:
# Get the client and path
client, path = get_transport_and_path(
remote_location, config=r.get_config_stack(), **kwargs)
selected_refs = []
def update_refs(refs):
selected_refs.extend(parse_reftuples(r.refs, refs, refspecs))
new_refs = {}
# TODO: Handle selected_refs == {None: None}
for (lh, rh, force) in selected_refs:
if lh is None:
new_refs[rh] = ZERO_SHA
else:
new_refs[rh] = r.refs[lh]
return new_refs
err_encoding = getattr(errstream, 'encoding', None) or DEFAULT_ENCODING
remote_location_bytes = client.get_url(path).encode(err_encoding)
try:
client.send_pack(
path, update_refs,
generate_pack_data=r.object_store.generate_pack_data,
progress=errstream.write)
errstream.write(
b"Push to " + remote_location_bytes + b" successful.\n")
except (UpdateRefsError, SendPackError) as e:
errstream.write(b"Push to " + remote_location_bytes +
b" failed -> " + e.message.encode(err_encoding) +
b"\n")
def pull(repo, remote_location=None, refspecs=None,
outstream=default_bytes_out_stream,
errstream=default_bytes_err_stream, **kwargs):
"""Pull from remote via dulwich.client
:param repo: Path to repository
:param remote_location: Location of the remote
:param refspec: refspecs to fetch
:param outstream: A stream file to write to output
:param errstream: A stream file to write to errors
"""
# Open the repo
with open_repo_closing(repo) as r:
if remote_location is None:
# TODO(jelmer): Lookup 'remote' for current branch in config
raise NotImplementedError(
"looking up remote from branch config not supported yet")
if refspecs is None:
refspecs = [b"HEAD"]
selected_refs = []
def determine_wants(remote_refs):
selected_refs.extend(
parse_reftuples(remote_refs, r.refs, refspecs))
return [remote_refs[lh] for (lh, rh, force) in selected_refs]
client, path = get_transport_and_path(
remote_location, config=r.get_config_stack(), **kwargs)
fetch_result = client.fetch(
path, r, progress=errstream.write, determine_wants=determine_wants)
for (lh, rh, force) in selected_refs:
r.refs[rh] = fetch_result.refs[lh]
if selected_refs:
r[b'HEAD'] = fetch_result.refs[selected_refs[0][1]]
# Perform 'git checkout .' - syncs staged changes
tree = r[b"HEAD"].tree
r.reset_index(tree=tree)
def status(repo=".", ignored=False):
"""Returns staged, unstaged, and untracked changes relative to the HEAD.
:param repo: Path to repository or repository object
:param ignored: Whether to include ignored files in `untracked`
:return: GitStatus tuple,
staged - dict with lists of staged paths (diff index/HEAD)
unstaged - list of unstaged paths (diff index/working-tree)
untracked - list of untracked, un-ignored & non-.git paths
"""
with open_repo_closing(repo) as r:
# 1. Get status of staged
tracked_changes = get_tree_changes(r)
# 2. Get status of unstaged
index = r.open_index()
normalizer = r.get_blob_normalizer()
filter_callback = normalizer.checkin_normalize
unstaged_changes = list(
get_unstaged_changes(index, r.path, filter_callback)
)
ignore_manager = IgnoreFilterManager.from_repo(r)
untracked_paths = get_untracked_paths(r.path, r.path, index)
if ignored:
untracked_changes = list(untracked_paths)
else:
untracked_changes = [
p for p in untracked_paths
if not ignore_manager.is_ignored(p)]
return GitStatus(tracked_changes, unstaged_changes, untracked_changes)
def _walk_working_dir_paths(frompath, basepath):
"""Get path, is_dir for files in working dir from frompath
:param frompath: Path to begin walk
:param basepath: Path to compare to
"""
for dirpath, dirnames, filenames in os.walk(frompath):
# Skip .git and below.
if '.git' in dirnames:
dirnames.remove('.git')
if dirpath != basepath:
continue
if '.git' in filenames:
filenames.remove('.git')
if dirpath != basepath:
continue
if dirpath != frompath:
yield dirpath, True
for filename in filenames:
filepath = os.path.join(dirpath, filename)
yield filepath, False
def get_untracked_paths(frompath, basepath, index):
"""Get untracked paths.
;param frompath: Path to walk
:param basepath: Path to compare to
:param index: Index to check against
"""
for ap, is_dir in _walk_working_dir_paths(frompath, basepath):
if not is_dir:
ip = path_to_tree_path(basepath, ap)
if ip not in index:
yield os.path.relpath(ap, frompath)
def get_tree_changes(repo):
"""Return add/delete/modify changes to tree by comparing index to HEAD.
:param repo: repo path or object
:return: dict with lists for each type of change
"""
with open_repo_closing(repo) as r:
index = r.open_index()
# Compares the Index to the HEAD & determines changes
# Iterate through the changes and report add/delete/modify
# TODO: call out to dulwich.diff_tree somehow.
tracked_changes = {
'add': [],
'delete': [],
'modify': [],
}
try:
tree_id = r[b'HEAD'].tree
except KeyError:
tree_id = None
for change in index.changes_from_tree(r.object_store, tree_id):
if not change[0][0]:
tracked_changes['add'].append(change[0][1])
elif not change[0][1]:
tracked_changes['delete'].append(change[0][0])
elif change[0][0] == change[0][1]:
tracked_changes['modify'].append(change[0][0])
else:
raise AssertionError('git mv ops not yet supported')
return tracked_changes
def daemon(path=".", address=None, port=None):
"""Run a daemon serving Git requests over TCP/IP.
:param path: Path to the directory to serve.
:param address: Optional address to listen on (defaults to ::)
:param port: Optional port to listen on (defaults to TCP_GIT_PORT)
"""
# TODO(jelmer): Support git-daemon-export-ok and --export-all.
backend = FileSystemBackend(path)
server = TCPGitServer(backend, address, port)
server.serve_forever()
def web_daemon(path=".", address=None, port=None):
"""Run a daemon serving Git requests over HTTP.
:param path: Path to the directory to serve
:param address: Optional address to listen on (defaults to ::)
:param port: Optional port to listen on (defaults to 80)
"""
from dulwich.web import (
make_wsgi_chain,
make_server,
WSGIRequestHandlerLogger,
WSGIServerLogger)
backend = FileSystemBackend(path)
app = make_wsgi_chain(backend)
server = make_server(address, port, app,
handler_class=WSGIRequestHandlerLogger,
server_class=WSGIServerLogger)
server.serve_forever()
def upload_pack(path=".", inf=None, outf=None):
"""Upload a pack file after negotiating its contents using smart protocol.
:param path: Path to the repository
:param inf: Input stream to communicate with client
:param outf: Output stream to communicate with client
"""
if outf is None:
outf = getattr(sys.stdout, 'buffer', sys.stdout)
if inf is None:
inf = getattr(sys.stdin, 'buffer', sys.stdin)
path = os.path.expanduser(path)
backend = FileSystemBackend(path)
def send_fn(data):
outf.write(data)
outf.flush()
proto = Protocol(inf.read, send_fn)
handler = UploadPackHandler(backend, [path], proto)
# FIXME: Catch exceptions and write a single-line summary to outf.
handler.handle()
return 0
def receive_pack(path=".", inf=None, outf=None):
"""Receive a pack file after negotiating its contents using smart protocol.
:param path: Path to the repository
:param inf: Input stream to communicate with client
:param outf: Output stream to communicate with client
"""
if outf is None:
outf = getattr(sys.stdout, 'buffer', sys.stdout)
if inf is None:
inf = getattr(sys.stdin, 'buffer', sys.stdin)
path = os.path.expanduser(path)
backend = FileSystemBackend(path)
def send_fn(data):
outf.write(data)
outf.flush()
proto = Protocol(inf.read, send_fn)
handler = ReceivePackHandler(backend, [path], proto)
# FIXME: Catch exceptions and write a single-line summary to outf.
handler.handle()
return 0
def _make_branch_ref(name):
if getattr(name, 'encode', None):
name = name.encode(DEFAULT_ENCODING)
return LOCAL_BRANCH_PREFIX + name
def _make_tag_ref(name):
if getattr(name, 'encode', None):
name = name.encode(DEFAULT_ENCODING)
return b"refs/tags/" + name
def branch_delete(repo, name):
"""Delete a branch.
:param repo: Path to the repository
:param name: Name of the branch
"""
with open_repo_closing(repo) as r:
if isinstance(name, list):
names = name
else:
names = [name]
for name in names:
del r.refs[_make_branch_ref(name)]
def branch_create(repo, name, objectish=None, force=False):
"""Create a branch.
:param repo: Path to the repository
:param name: Name of the new branch
:param objectish: Target object to point new branch at (defaults to HEAD)
:param force: Force creation of branch, even if it already exists
"""
with open_repo_closing(repo) as r:
if objectish is None:
objectish = "HEAD"
object = parse_object(r, objectish)
refname = _make_branch_ref(name)
ref_message = b"branch: Created from " + objectish.encode('utf-8')
if force:
r.refs.set_if_equals(refname, None, object.id, message=ref_message)
else:
if not r.refs.add_if_new(refname, object.id, message=ref_message):
raise KeyError("Branch with name %s already exists." % name)
def branch_list(repo):
"""List all branches.
:param repo: Path to the repository
"""
with open_repo_closing(repo) as r:
return r.refs.keys(base=LOCAL_BRANCH_PREFIX)
def active_branch(repo):
"""Return the active branch in the repository, if any.
Args:
repo: Repository to open
Returns:
branch name
Raises:
KeyError: if the repository does not have a working tree
IndexError: if HEAD is floating
"""
with open_repo_closing(repo) as r:
active_ref = r.refs.follow(b'HEAD')[0][1]
if not active_ref.startswith(LOCAL_BRANCH_PREFIX):
raise ValueError(active_ref)
return active_ref[len(LOCAL_BRANCH_PREFIX):]
def fetch(repo, remote_location, remote_name=b'origin', outstream=sys.stdout,
errstream=default_bytes_err_stream, message=None, depth=None,
prune=False, prune_tags=False, **kwargs):
"""Fetch objects from a remote server.
- :param repo: Path to the repository
- :param remote_location: String identifying a remote server
- :param remote_name: Name for remote server
- :param outstream: Output stream (defaults to stdout)
- :param errstream: Error stream (defaults to stderr)
- :param message: Reflog message (defaults to b"fetch: from ")
- :param depth: Depth to fetch at
- :param prune: Prune remote removed refs
- :param prune_tags: Prune reomte removed tags
- :return: Dictionary with refs on the remote
+ Args:
+ repo: Path to the repository
+ remote_location: String identifying a remote server
+ remote_name: Name for remote server
+ outstream: Output stream (defaults to stdout)
+ errstream: Error stream (defaults to stderr)
+ message: Reflog message (defaults to b"fetch: from ")
+ depth: Depth to fetch at
+ prune: Prune remote removed refs
+ prune_tags: Prune reomte removed tags
+ Returns:
+ Dictionary with refs on the remote
"""
if message is None:
message = b'fetch: from ' + remote_location.encode("utf-8")
with open_repo_closing(repo) as r:
client, path = get_transport_and_path(
remote_location, config=r.get_config_stack(), **kwargs)
fetch_result = client.fetch(path, r, progress=errstream.write,
depth=depth)
stripped_refs = strip_peeled_refs(fetch_result.refs)
branches = {
n[len(LOCAL_BRANCH_PREFIX):]: v for (n, v) in stripped_refs.items()
if n.startswith(LOCAL_BRANCH_PREFIX)}
r.refs.import_refs(
b'refs/remotes/' + remote_name, branches, message=message,
prune=prune)
tags = {
n[len(b'refs/tags/'):]: v for (n, v) in stripped_refs.items()
if n.startswith(b'refs/tags/') and
not n.endswith(ANNOTATED_TAG_SUFFIX)}
r.refs.import_refs(
b'refs/tags', tags, message=message,
prune=prune_tags)
return fetch_result.refs
def ls_remote(remote, config=None, **kwargs):
"""List the refs in a remote.
- :param remote: Remote repository location
- :param config: Configuration to use
- :return: Dictionary with remote refs
+ Args:
+ remote: Remote repository location
+ config: Configuration to use
+ Returns:
+ Dictionary with remote refs
"""
if config is None:
config = StackedConfig.default()
client, host_path = get_transport_and_path(remote, config=config, **kwargs)
return client.get_refs(host_path)
def repack(repo):
"""Repack loose files in a repository.
Currently this only packs loose objects.
:param repo: Path to the repository
"""
with open_repo_closing(repo) as r:
r.object_store.pack_loose_objects()
def pack_objects(repo, object_ids, packf, idxf, delta_window_size=None):
"""Pack objects into a file.
:param repo: Path to the repository
:param object_ids: List of object ids to write
:param packf: File-like object to write to
:param idxf: File-like object to write to (can be None)
"""
with open_repo_closing(repo) as r:
entries, data_sum = write_pack_objects(
packf,
r.object_store.iter_shas((oid, None) for oid in object_ids),
delta_window_size=delta_window_size)
if idxf is not None:
entries = sorted([(k, v[0], v[1]) for (k, v) in entries.items()])
write_pack_index(idxf, entries, data_sum)
def ls_tree(repo, treeish=b"HEAD", outstream=sys.stdout, recursive=False,
name_only=False):
"""List contents of a tree.
:param repo: Path to the repository
:param tree_ish: Tree id to list
:param outstream: Output stream (defaults to stdout)
:param recursive: Whether to recursively list files
:param name_only: Only print item name
"""
def list_tree(store, treeid, base):
for (name, mode, sha) in store[treeid].iteritems():
if base:
name = posixpath.join(base, name)
if name_only:
outstream.write(name + b"\n")
else:
outstream.write(pretty_format_tree_entry(name, mode, sha))
if stat.S_ISDIR(mode) and recursive:
list_tree(store, sha, name)
with open_repo_closing(repo) as r:
tree = parse_tree(r, treeish)
list_tree(r.object_store, tree.id, "")
def remote_add(repo, name, url):
"""Add a remote.
:param repo: Path to the repository
:param name: Remote name
:param url: Remote URL
"""
if not isinstance(name, bytes):
name = name.encode(DEFAULT_ENCODING)
if not isinstance(url, bytes):
url = url.encode(DEFAULT_ENCODING)
with open_repo_closing(repo) as r:
c = r.get_config()
section = (b'remote', name)
if c.has_section(section):
raise RemoteExists(section)
c.set(section, b"url", url)
c.write_to_path()
def check_ignore(repo, paths, no_index=False):
"""Debug gitignore files.
:param repo: Path to the repository
:param paths: List of paths to check for
:param no_index: Don't check index
:return: List of ignored files
"""
with open_repo_closing(repo) as r:
index = r.open_index()
ignore_manager = IgnoreFilterManager.from_repo(r)
for path in paths:
if not no_index and path_to_tree_path(r.path, path) in index:
continue
if os.path.isabs(path):
path = os.path.relpath(path, r.path)
if ignore_manager.is_ignored(path):
yield path
def update_head(repo, target, detached=False, new_branch=None):
"""Update HEAD to point at a new branch/commit.
Note that this does not actually update the working tree.
:param repo: Path to the repository
:param detach: Create a detached head
:param target: Branch or committish to switch to
:param new_branch: New branch to create
"""
with open_repo_closing(repo) as r:
if new_branch is not None:
to_set = _make_branch_ref(new_branch)
else:
to_set = b"HEAD"
if detached:
# TODO(jelmer): Provide some way so that the actual ref gets
# updated rather than what it points to, so the delete isn't
# necessary.
del r.refs[to_set]
r.refs[to_set] = parse_commit(r, target).id
else:
r.refs.set_symbolic_ref(to_set, parse_ref(r, target))
if new_branch is not None:
r.refs.set_symbolic_ref(b"HEAD", to_set)
def check_mailmap(repo, contact):
"""Check canonical name and email of contact.
:param repo: Path to the repository
:param contact: Contact name and/or email
:return: Canonical contact data
"""
with open_repo_closing(repo) as r:
from dulwich.mailmap import Mailmap
import errno
try:
mailmap = Mailmap.from_path(os.path.join(r.path, '.mailmap'))
except IOError as e:
if e.errno != errno.ENOENT:
raise
mailmap = Mailmap()
return mailmap.lookup(contact)
def fsck(repo):
"""Check a repository.
:param repo: A path to the repository
:return: Iterator over errors/warnings
"""
with open_repo_closing(repo) as r:
# TODO(jelmer): check pack files
# TODO(jelmer): check graph
# TODO(jelmer): check refs
for sha in r.object_store:
o = r.object_store[sha]
try:
o.check()
except Exception as e:
yield (sha, e)
def stash_list(repo):
"""List all stashes in a repository."""
with open_repo_closing(repo) as r:
from dulwich.stash import Stash
stash = Stash.from_repo(r)
return enumerate(list(stash.stashes()))
def stash_push(repo):
"""Push a new stash onto the stack."""
with open_repo_closing(repo) as r:
from dulwich.stash import Stash
stash = Stash.from_repo(r)
stash.push()
def stash_pop(repo):
"""Pop a new stash from the stack."""
with open_repo_closing(repo) as r:
from dulwich.stash import Stash
stash = Stash.from_repo(r)
stash.pop()
def ls_files(repo):
"""List all files in an index."""
with open_repo_closing(repo) as r:
return sorted(r.open_index())
def describe(repo):
"""Describe the repository version.
:param projdir: git repository root
:returns: a string description of the current git revision
Examples: "gabcdefh", "v0.1" or "v0.1-5-gabcdefh".
"""
# Get the repository
with open_repo_closing(repo) as r:
# Get a list of all tags
refs = r.get_refs()
tags = {}
for key, value in refs.items():
key = key.decode()
obj = r.get_object(value)
if u'tags' not in key:
continue
_, tag = key.rsplit(u'/', 1)
try:
commit = obj.object
except AttributeError:
continue
else:
commit = r.get_object(commit[1])
tags[tag] = [
datetime.datetime(*time.gmtime(commit.commit_time)[:6]),
commit.id.decode('ascii'),
]
sorted_tags = sorted(tags.items(),
key=lambda tag: tag[1][0],
reverse=True)
# If there are no tags, return the current commit
if len(sorted_tags) == 0:
return 'g{}'.format(r[r.head()].id.decode('ascii')[:7])
# We're now 0 commits from the top
commit_count = 0
# Get the latest commit
latest_commit = r[r.head()]
# Walk through all commits
walker = r.get_walker()
for entry in walker:
# Check if tag
commit_id = entry.commit.id.decode('ascii')
for tag in sorted_tags:
tag_name = tag[0]
tag_commit = tag[1][1]
if commit_id == tag_commit:
if commit_count == 0:
return tag_name
else:
return '{}-{}-g{}'.format(
tag_name,
commit_count,
latest_commit.id.decode('ascii')[:7])
commit_count += 1
# Return plain commit if no parent tag can be found
return 'g{}'.format(latest_commit.id.decode('ascii')[:7])
def get_object_by_path(repo, path, committish=None):
"""Get an object by path.
:param repo: A path to the repository
:param path: Path to look up
:param committish: Commit to look up path in
:return: A `ShaFile` object
"""
if committish is None:
committish = "HEAD"
# Get the repository
with open_repo_closing(repo) as r:
commit = parse_commit(r, committish)
base_tree = commit.tree
if not isinstance(path, bytes):
path = path.encode(commit.encoding or DEFAULT_ENCODING)
(mode, sha) = tree_lookup_path(
r.object_store.__getitem__,
base_tree, path)
return r[sha]
def write_tree(repo):
"""Write a tree object from the index.
:param repo: Repository for which to write tree
:return: tree id for the tree that was written
"""
with open_repo_closing(repo) as r:
return r.open_index().commit(r.object_store)