diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index a2d887b..ef3cc4b 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,242 +1,319 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert pygit2 objects to dictionaries suitable for swh.storage""" from pygit2 import GIT_OBJ_COMMIT from swh.core import hashutil from .utils import format_date HASH_ALGORITHMS = ['sha1', 'sha256'] def blob_to_content(id, repo, log=None, max_content_size=None, origin_id=None): """Format a blob as a content""" blob = repo[id] size = blob.size ret = { 'sha1_git': id.raw, 'length': blob.size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (id.hex, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_repo': repo.path, 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.data hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def tree_to_directory(id, repo, log=None): """Format a tree as a directory""" ret = { 'id': id.raw, } entries = [] ret['entries'] = entries entry_type_map = { 'tree': 'dir', 'blob': 'file', 'commit': 'rev', } for entry in repo[id]: entries.append({ 'type': entry_type_map[entry.type], 'perms': entry.filemode, 'name': entry._name, 'target': entry.id.raw, }) return ret def commit_to_revision(id, repo, log=None): """Format a commit as a revision""" commit = repo[id] author = commit.author committer = commit.committer return { 'id': id.raw, 'date': format_date(author), 'committer_date': format_date(committer), 'type': 'git', 'directory': commit.tree_id.raw, 'message': commit.raw_message, 'metadata': None, 'author': { 'name': author.raw_name, 'email': author.raw_email, }, 'committer': { 'name': committer.raw_name, 'email': committer.raw_email, }, 'synthetic': False, 'parents': [p.raw for p in commit.parent_ids], } def annotated_tag_to_release(id, repo, log=None): """Format an annotated tag as a release""" tag = repo[id] tag_pointer = repo[tag.target] if tag_pointer.type != GIT_OBJ_COMMIT: if log: log.warn("Ignoring tag %s pointing at %s %s" % ( tag.id.hex, tag_pointer.__class__.__name__, tag_pointer.id.hex), extra={ 'swh_type': 'loader_git_tag_ignore', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, 'swh_tag_dest': { 'type': tag_pointer.__class__.__name__, 'id': tag_pointer.id.hex, }, }) return if not tag.tagger: if log: log.warn("Tag %s has no author, using default values" % id.hex, extra={ 'swh_type': 'loader_git_tag_author_default', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, }) author = None date = None else: author = { 'name': tag.tagger.raw_name, 'email': tag.tagger.raw_email, } date = format_date(tag.tagger) return { 'id': id.raw, 'date': date, 'target': tag.target.raw, 'target_type': 'revision', 'message': tag._message, 'name': tag.name.raw, 'author': author, 'metadata': None, 'synthetic': False, } def ref_to_occurrence(ref): """Format a reference as an occurrence""" occ = ref.copy() if 'branch' in ref: branch = ref['branch'] if isinstance(branch, str): occ['branch'] = branch.encode('utf-8') else: occ['branch'] = branch return occ def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" size = blob.raw_length() ret = { 'sha1_git': blob.sha().digest(), 'length': size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (blob.id.encode(), size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ 'type': entry_mode_map[entry.mode], 'perms': entry.mode, 'name': entry.path, 'target': hashutil.hex_to_hash(entry.sha.decode('ascii')), }) return ret +def parse_author(name_email): + """Parse an author line""" + + name, email = name_email.split(b' <', 1) + email = email[:-1] + + return { + 'name': name, + 'email': email, + } + + +def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): + """Convert the dulwich timestamp information to a structure compatible with + Software Heritage""" + return { + 'timestamp': timestamp, + 'offset': timezone, + 'negative_utc': timezone_neg_utc if timezone == 0 else None, + } + + def dulwich_commit_to_revision(commit, log=None): + ret = { + 'id': commit.sha().digest(), + 'author': parse_author(commit.author), + 'date': dulwich_tsinfo_to_timestamp( + commit.author_time, + commit.author_timezone, + commit._author_timezone_neg_utc, + ), + 'committer': parse_author(commit.committer), + 'committer_date': dulwich_tsinfo_to_timestamp( + commit.commit_time, + commit.commit_timezone, + commit._commit_timezone_neg_utc, + ), + 'type': 'git', + 'directory': bytes.fromhex(commit.tree.decode()), + 'message': commit.message, + 'metadata': None, + 'synthetic': False, + 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], + } + + return ret + + +DULWICH_TYPES = { + b'blob': 'content', + b'tree': 'directory', + b'commit': 'revision', + b'tag': 'release', +} + + +def dulwich_tag_to_revision(tag, log=None): + target, target_type = tag.object + ret = { + 'id': tag.sha().digest(), + 'name': tag.name, + 'author': parse_author(tag.tagger), + 'date': dulwich_tsinfo_to_timestamp( + tag.tag_time, + tag.tag_timezone, + tag._tag_timezone_neg_utc, + ), + 'target': bytes.fromhex(target.decode()), + 'target_type': DULWICH_TYPES[target_type], + 'message': tag._message, + 'metadata': None, + 'synthetic': False, + } + + return ret + + +def dulwich_ref_to_occurrence(ref, objects, timestamp, log=None): pass DULWICH_CONVERTERS = { b'blob': dulwich_blob_to_content, b'tree': dulwich_tree_to_directory, - b'commit': lambda x: x, + b'commit': dulwich_commit_to_revision, b'tag': lambda x: x, } diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index 0884482..ee20e9f 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,176 +1,296 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime from io import BytesIO import logging import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse from swh.core import config, hashutil from swh.storage import get_storage from . import converters class BulkUpdater(config.SWHConfig): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'updater.ini' DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5000/']), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } def __init__(self, config): self.config = config self.storage = get_storage(config['storage_class'], config['storage_args']) self.log = logging.getLogger('swh.loader.git.BulkLoader') class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_url): self.storage = storage self._parents_cache = {} + self._refs_cache = None origin = storage.origin_get({'url': origin_url, 'type': 'git'}) if origin: origin_id = origin['id'] self.heads = self._cache_heads(origin_id) - self.tags = self._cache_tags(origin_id) else: self.heads = [] - self.tags = [] def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" root_rev = hashutil.hex_to_hash(commit.decode()) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id): """Return all the known head commits for `origin_id`""" return [ hashutil.hash_to_bytehex(revision['id']) for revision in self.storage.revision_get_by( origin_id, branch_name=None, timestamp=None, limit=None) ] - def _cache_tags(self, origin_id): - """Return all the tag objects pointing to heads of `origin_id`""" - return [ - hashutil.hash_to_bytehex(release['id']) - for release in self.storage.release_get_by(origin_id) - ] - def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads - def get_tags(self): - return self.tags + @staticmethod + def _encode_for_storage(objects): + return [hashutil.hex_to_hash(object.decode()) for object in objects] + + @staticmethod + def _decode_from_storage(objects): + return set(hashutil.hash_to_hex(object).encode() for object in objects) + + def get_stored_commits(self, commits): + return commits - self._decode_from_storage( + self.storage.revision_missing( + self._encode_for_storage(commits) + ) + ) + + def get_stored_tags(self, tags): + return tags - self._decode_from_storage( + self.storage.release_missing( + self._encode_for_storage(tags) + ) + ) + + def get_stored_trees(self, trees): + return trees - self._decode_from_storage( + self.storage.directory_missing( + self._encode_for_storage(trees) + ) + ) + + def get_stored_blobs(self, blobs): + return set() def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) - def determine_wants(self, refs): - all_objs = set() - objs_by_id = defaultdict(list) - for ref, id in refs.items(): - objs_by_id[id].append(ref) + @staticmethod + def filter_unwanted_refs(refs): + """Filter the unwanted references from refs""" + ret = {} + for ref, val in refs.items(): if ref.endswith(b'^{}'): + # Peeled refs make the git protocol explode continue - if ref.startswith(b'refs/tags/'): - all_objs.add(id) - if ref.startswith(b'refs/pull/'): - if not ref.endswith(b'/merge'): - all_objs.add(id) - continue - if not ref.startswith(b'refs/pull/'): - all_objs.add(id) + elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): + # We filter-out auto-merged GitHub pull requests continue + else: + ret[ref] = val + + return ret + + def determine_wants(self, refs): + refs = self.parse_refs(refs) + ret = set() + for ref, target in self.filter_unwanted_refs(refs).items(): + if target['target_type'] is None: + # The target doesn't exist in Software Heritage + ret.add(target['target']) + + return list(ret) + + def parse_refs(self, remote_refs): + """Parse the remote refs information and list the objects that exist in + Software Heritage""" + if self._refs_cache is not None: + return self._refs_cache + + all_objs = set(remote_refs.values()) + type_by_id = defaultdict(None) + + tags = self.get_stored_tags(all_objs) + all_objs -= tags + for tag in tags: + type_by_id[tag] = 'release' + + commits = self.get_stored_commits(all_objs) + all_objs -= commits + for commit in commits: + type_by_id[commit] = 'revision' + + trees = self.get_stored_trees(all_objs) + all_objs -= trees + for tree in trees: + type_by_id[tree] = 'directory' + + blobs = self.get_stored_blobs(all_objs) + all_objs -= blobs + for blob in blobs: + type_by_id[blob] = 'content' + + ret = {} + for ref, id in remote_refs.items(): + ret[ref] = { + 'target': id, + 'target_type': type_by_id.get(id), + } + + self._refs_cache = ret - ret = list(all_objs - set(self.get_heads()) - set(self.get_tags())) return ret -def fetch_pack_from_origin(storage, origin_url, base_url, pack_buffer, - activity_buffer): +def fetch_pack_from_origin(storage, origin_url, base_url, activity_buffer): + """Fetch a pack from the origin""" + pack_buffer = BytesIO() base_repo = SWHRepoRepresentation(storage, base_url) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) - pack = client.fetch_pack(path.encode('ascii'), - base_repo.determine_wants, - base_repo.graph_walker(), - pack_buffer.write, - progress=activity_buffer.write) + def do_pack(data, pack_buffer=pack_buffer): + pack_buffer.write(data) + + def do_activity(data, activity_buffer=activity_buffer): + activity_buffer.write(data) + activity_buffer.flush() + + remote_refs = client.fetch_pack(path.encode('ascii'), + base_repo.determine_wants, + base_repo.graph_walker(), + do_pack, + progress=do_activity) + + local_refs = base_repo.parse_refs(remote_refs) + + pack_buffer.flush() + pack_size = pack_buffer.tell() + pack_buffer.seek(0) + return { + 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), + 'local_refs': local_refs, + 'pack_buffer': pack_buffer, + 'pack_size': pack_size, + } + + +def refs_to_occurrences(remote_refs, local_refs, types_per_id, origin_id, + timestamp): + """Merge references remote and local""" + ret = {} + for ref, data in remote_refs.items(): + ret[ref] = local_refs[ref].copy() + ret[ref].update({ + 'branch': ref, + 'origin': origin_id, + 'validity': timestamp, + }) + if not ret[ref]['target_type']: + target_type = types_per_id[remote_refs[ref]] + ret[ref]['target_type'] = converters.DULWICH_TYPES[target_type] + + return ret - return pack if __name__ == '__main__': config = BulkUpdater.parse_config_file( base_filename='updater.ini' ) bulkupdater = BulkUpdater(config) origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] - pack = BytesIO() - refs = fetch_pack_from_origin( - bulkupdater.storage, origin_url, base_url, pack, sys.stderr.buffer) + fetch_info = fetch_pack_from_origin( + bulkupdater.storage, origin_url, base_url, sys.stderr.buffer) + + pack_data = PackInflater.for_pack_data( + PackData.from_file(fetch_info['pack_buffer'], fetch_info['pack_size'])) - pack_size = pack.tell() - pack.seek(0) - pack_data = PackInflater.for_pack_data(PackData.from_file(pack, pack_size)) objs_per_type = defaultdict(list) + types_per_id = {} for obj in pack_data: obj_type = obj.type_name + types_per_id[obj.id] = obj_type conv = converters.DULWICH_CONVERTERS[obj_type] objs_per_type[obj_type].append(conv(obj)) + print([(k, len(l)) for k, l in sorted(objs_per_type.items())], + file=sys.stderr, end='\r') + + remote_refs = fetch_info['remote_refs'] + local_refs = fetch_info['local_refs'] + + origin_id = 42 + now = datetime.datetime.now() + + occurrences = refs_to_occurrences( + remote_refs, local_refs, types_per_id, origin_id, now) + + for branch in occurrences: + print(occurrences[branch]) + objs_per_type[b'refs'] = remote_refs print({k: len(l) for k, l in objs_per_type.items()}) - print(len(refs))