diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index e4ea739..a2d887b 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,169 +1,242 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert pygit2 objects to dictionaries suitable for swh.storage""" from pygit2 import GIT_OBJ_COMMIT from swh.core import hashutil from .utils import format_date HASH_ALGORITHMS = ['sha1', 'sha256'] def blob_to_content(id, repo, log=None, max_content_size=None, origin_id=None): """Format a blob as a content""" blob = repo[id] size = blob.size ret = { 'sha1_git': id.raw, 'length': blob.size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (id.hex, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_repo': repo.path, 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.data hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def tree_to_directory(id, repo, log=None): """Format a tree as a directory""" ret = { 'id': id.raw, } entries = [] ret['entries'] = entries entry_type_map = { 'tree': 'dir', 'blob': 'file', 'commit': 'rev', } for entry in repo[id]: entries.append({ 'type': entry_type_map[entry.type], 'perms': entry.filemode, 'name': entry._name, 'target': entry.id.raw, }) return ret def commit_to_revision(id, repo, log=None): """Format a commit as a revision""" commit = repo[id] author = commit.author committer = commit.committer return { 'id': id.raw, 'date': format_date(author), 'committer_date': format_date(committer), 'type': 'git', 'directory': commit.tree_id.raw, 'message': commit.raw_message, 'metadata': None, 'author': { 'name': author.raw_name, 'email': author.raw_email, }, 'committer': { 'name': committer.raw_name, 'email': committer.raw_email, }, 'synthetic': False, 'parents': [p.raw for p in commit.parent_ids], } def annotated_tag_to_release(id, repo, log=None): """Format an annotated tag as a release""" tag = repo[id] tag_pointer = repo[tag.target] if tag_pointer.type != GIT_OBJ_COMMIT: if log: log.warn("Ignoring tag %s pointing at %s %s" % ( tag.id.hex, tag_pointer.__class__.__name__, tag_pointer.id.hex), extra={ 'swh_type': 'loader_git_tag_ignore', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, 'swh_tag_dest': { 'type': tag_pointer.__class__.__name__, 'id': tag_pointer.id.hex, }, }) return if not tag.tagger: if log: log.warn("Tag %s has no author, using default values" % id.hex, extra={ 'swh_type': 'loader_git_tag_author_default', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, }) author = None date = None else: author = { 'name': tag.tagger.raw_name, 'email': tag.tagger.raw_email, } date = format_date(tag.tagger) return { 'id': id.raw, 'date': date, 'target': tag.target.raw, 'target_type': 'revision', 'message': tag._message, 'name': tag.name.raw, 'author': author, 'metadata': None, 'synthetic': False, } def ref_to_occurrence(ref): """Format a reference as an occurrence""" occ = ref.copy() if 'branch' in ref: branch = ref['branch'] if isinstance(branch, str): occ['branch'] = branch.encode('utf-8') else: occ['branch'] = branch return occ def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } + + +def dulwich_blob_to_content(blob, log=None, max_content_size=None, + origin_id=None): + """Convert a dulwich blob to a Software Heritage content""" + + size = blob.raw_length() + + ret = { + 'sha1_git': blob.sha().digest(), + 'length': size, + 'status': 'absent' + } + + if max_content_size: + if size > max_content_size: + if log: + log.info('Skipping content %s, too large (%s > %s)' % + (blob.id.encode(), size, max_content_size), extra={ + 'swh_type': 'loader_git_content_skip', + 'swh_id': id.hex, + 'swh_size': size, + }) + ret['reason'] = 'Content too large' + ret['origin'] = origin_id + return ret + + data = blob.as_raw_string() + hashes = hashutil.hashdata(data, HASH_ALGORITHMS) + ret.update(hashes) + ret['data'] = data + ret['status'] = 'visible' + + return ret + + +def dulwich_tree_to_directory(tree, log=None): + """Format a tree as a directory""" + ret = { + 'id': tree.sha().digest(), + } + entries = [] + ret['entries'] = entries + + entry_mode_map = { + 0o040000: 'dir', + 0o160000: 'rev', + 0o100644: 'file', + 0o100755: 'file', + 0o120000: 'file', + } + + for entry in tree.iteritems(): + entries.append({ + 'type': entry_mode_map[entry.mode], + 'perms': entry.mode, + 'name': entry.path, + 'target': hashutil.hex_to_hash(entry.sha.decode('ascii')), + }) + + return ret + + +def dulwich_commit_to_revision(commit, log=None): + pass + + +DULWICH_CONVERTERS = { + b'blob': dulwich_blob_to_content, + b'tree': dulwich_tree_to_directory, + b'commit': lambda x: x, + b'tag': lambda x: x, +} diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index e413f0b..0884482 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,166 +1,176 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import dulwich.client +from io import BytesIO import logging import sys from collections import defaultdict +import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker +from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse from swh.core import config, hashutil from swh.storage import get_storage +from . import converters + class BulkUpdater(config.SWHConfig): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'updater.ini' DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5000/']), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } def __init__(self, config): self.config = config self.storage = get_storage(config['storage_class'], config['storage_args']) self.log = logging.getLogger('swh.loader.git.BulkLoader') class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_url): self.storage = storage + + self._parents_cache = {} + origin = storage.origin_get({'url': origin_url, 'type': 'git'}) if origin: origin_id = origin['id'] - - self.parents = self._cache_parents(origin_id) self.heads = self._cache_heads(origin_id) self.tags = self._cache_tags(origin_id) else: - raise ValueError('Unexpected error, the origin %s was not found.' - % origin_url) - - def _cache_parents(self, origin_id): - """Return an id -> parent_ids mapping for the repository at - `origin_id`""" - occurrences = self.storage.occurrence_get(origin_id) - root_revisions = (occ['revision'] for occ in occurrences) - - ret_parents = defaultdict(list) - for revision in self.storage.revision_log(root_revisions): - rev_id = hashutil.hash_to_bytehex(revision['id']) - for parent in revision['parents']: - parent_id = hashutil.hash_to_bytehex(parent) - ret_parents[rev_id].append(parent_id) - - return ret_parents + self.heads = [] + self.tags = [] + + def _fill_parents_cache(self, commit): + """When querying for a commit's parents, we fill the cache to a depth of 100 + commits.""" + root_rev = hashutil.hex_to_hash(commit.decode()) + for rev, parents in self.storage.revision_shortlog([root_rev], 100): + rev_id = hashutil.hash_to_bytehex(rev) + if rev_id not in self._parents_cache: + self._parents_cache[rev_id] = [ + hashutil.hash_to_bytehex(parent) for parent in parents + ] def _cache_heads(self, origin_id): """Return all the known head commits for `origin_id`""" - for revision in self.storage.revision_get_by(origin_id, - branch_name=None, - timestamp=None, - limit=None): - yield hashutil.hash_to_bytehex(revision['id']) + return [ + hashutil.hash_to_bytehex(revision['id']) + for revision in self.storage.revision_get_by( + origin_id, branch_name=None, timestamp=None, limit=None) + ] def _cache_tags(self, origin_id): """Return all the tag objects pointing to heads of `origin_id`""" - for release in self.storage.release_get_by(origin_id): - yield hashutil.hash_to_bytehex(release['id']) + return [ + hashutil.hash_to_bytehex(release['id']) + for release in self.storage.release_get_by(origin_id) + ] def get_parents(self, commit): """get the parent commits for `commit`""" - print('########################### Request commit %s' % commit) - return self.parents[commit] + if commit not in self._parents_cache: + self._fill_parents_cache(commit) + return self._parents_cache.get(commit, []) def get_heads(self): - print('########################### Request heads!') return self.heads def get_tags(self): - print('########################### Request tags!') return self.tags def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) def determine_wants(self, refs): all_objs = set() objs_by_id = defaultdict(list) for ref, id in refs.items(): objs_by_id[id].append(ref) if ref.endswith(b'^{}'): continue if ref.startswith(b'refs/tags/'): all_objs.add(id) if ref.startswith(b'refs/pull/'): if not ref.endswith(b'/merge'): all_objs.add(id) continue if not ref.startswith(b'refs/pull/'): all_objs.add(id) continue ret = list(all_objs - set(self.get_heads()) - set(self.get_tags())) return ret -def fetch_pack_from_origin(storage, origin_url, buf): +def fetch_pack_from_origin(storage, origin_url, base_url, pack_buffer, + activity_buffer): - def report_activity(arg): - print('########################### Report activity %s!' % arg) - # sys.stderr.buffer.write(arg) - # sys.stderr.buffer.flush() - - repo = SWHRepoRepresentation(storage, origin_url) + base_repo = SWHRepoRepresentation(storage, base_url) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) pack = client.fetch_pack(path.encode('ascii'), - repo.determine_wants, - repo.graph_walker(), - buf.write, - progress=report_activity) - - # refs = client.get_refs(path.encode('ascii')) - # print(refs) + base_repo.determine_wants, + base_repo.graph_walker(), + pack_buffer.write, + progress=activity_buffer.write) return pack if __name__ == '__main__': config = BulkUpdater.parse_config_file( base_filename='updater.ini' ) bulkupdater = BulkUpdater(config) origin_url = sys.argv[1] - pack = fetch_pack_from_origin(bulkupdater.storage, - origin_url, - sys.stdout.buffer) - print(pack) + base_url = origin_url + if len(sys.argv) > 2: + base_url = sys.argv[2] + + pack = BytesIO() + refs = fetch_pack_from_origin( + bulkupdater.storage, origin_url, base_url, pack, sys.stderr.buffer) + + pack_size = pack.tell() + pack.seek(0) + pack_data = PackInflater.for_pack_data(PackData.from_file(pack, pack_size)) + objs_per_type = defaultdict(list) + for obj in pack_data: + obj_type = obj.type_name + conv = converters.DULWICH_CONVERTERS[obj_type] + objs_per_type[obj_type].append(conv(obj)) + + print({k: len(l) for k, l in objs_per_type.items()}) + print(len(refs))