diff --git a/bin/test-bulk-loader b/bin/test-bulk-loader index 5678fc8..64e76e1 100755 --- a/bin/test-bulk-loader +++ b/bin/test-bulk-loader @@ -1,13 +1,43 @@ #!/usr/bin/env python3 import logging import sys import pygit2 -from swh.loader.git.git import parse_via_object_list +from swh.core import config +from swh.loader.git.git import BulkLoader + +DEFAULT_CONFIG = { + 'db': ('str', 'dbname=softwareheritage-dev'), + 'storage_base': ('str', '/tmp/swh-loader-git/test'), + 'repo_path': ('str', None), + + 'origin': ('int', -1), + 'authority': ('int', 1), + 'validity': ('str', '2015-01-01 00:00:00+00'), + + 'create_origin': ('bool', True), + 'send_contents': ('bool', True), + 'send_directories': ('bool', True), + 'send_revisions': ('bool', True), + 'send_releases': ('bool', True), + 'send_occurrences': ('bool', True), + + 'content_packet_size': ('int', 100000), + 'directory_packet_size': ('int', 25000), + 'revision_packet_size': ('int', 100000), + 'release_packet_size': ('int', 100000), + 'occurrence_packet_size': ('int', 100000), +} logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s') -parse_via_object_list(pygit2.discover_repository(sys.argv[1])) +my_config = {key: value for key, (type, value) in DEFAULT_CONFIG.items()} + +repo_path = pygit2.discover_repository(sys.argv[1]) +my_config['repo_path'] = repo_path + +loader = BulkLoader(my_config) +loader.process() diff --git a/swh/loader/git/git.py b/swh/loader/git/git.py index fbf443e..d5bd193 100644 --- a/swh/loader/git/git.py +++ b/swh/loader/git/git.py @@ -1,363 +1,395 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import glob import logging import os import subprocess import pygit2 from collections import defaultdict from pygit2 import Oid from pygit2 import GIT_OBJ_BLOB, GIT_OBJ_TREE, GIT_OBJ_COMMIT, GIT_OBJ_TAG from swh.core import hashutil from swh.storage.storage import Storage def format_date(signature): """Convert the date from a signature to a datetime""" return datetime.datetime.fromtimestamp(signature.time, datetime.timezone.utc) def list_objects_from_packfile_index(packfile_index): """List the objects indexed by this packfile, in packfile offset order. """ input_file = open(packfile_index, 'rb') with subprocess.Popen( ['/usr/bin/git', 'show-index'], stdin=input_file, stdout=subprocess.PIPE, ) as process: data = [] for line in process.stdout.readlines(): # git show-index returns the line as: # () line_components = line.split() offset = int(line_components[0]) object_id = line_components[1] data.append((offset, object_id)) yield from (Oid(hex=object_id.decode('ascii')) for _, object_id in sorted(data)) input_file.close() def simple_list_objects(repo): """List the objects in a given repository. Watch out for duplicates!""" objects_dir = os.path.join(repo.path, 'objects') # Git hashes are 40-character long objects_glob = os.path.join(objects_dir, '[0-9a-f]' * 2, '[0-9a-f]' * 38) packfile_dir = os.path.join(objects_dir, 'pack') if os.path.isdir(packfile_dir): for packfile_index in os.listdir(packfile_dir): if not packfile_index.endswith('.idx'): # Not an index file continue packfile_index_path = os.path.join(packfile_dir, packfile_index) yield from list_objects_from_packfile_index(packfile_index_path) for object_file in glob.glob(objects_glob): # Rebuild the object id as the last two components of the path yield Oid(hex=''.join(object_file.split(os.path.sep)[-2:])) def list_objects(repo): """List the objects in a given repository, removing duplicates""" seen = set() for oid in simple_list_objects(repo): if oid not in seen: yield oid seen.add(oid) def get_objects_per_object_type(repo): """Get all the (pygit2-parsed) objects from repo per object type""" objects_per_object_type = defaultdict(list) for object_id in list_objects(repo): object = repo[object_id] objects_per_object_type[object.type].append(object_id) return objects_per_object_type HASH_ALGORITHMS = ['sha1', 'sha256'] -def send_in_packets(repo, source_list, formatter, sender, packet_size): - """Send objects from `source_list`, passed through `formatter` (being - passed the `repo`), by the `sender`, in packets - of `packet_size` objects +def send_in_packets(source_list, formatter, sender, packet_size): + """Send objects from `source_list`, passed through `formatter`, by the + `sender`, in packets of `packet_size` objects """ formatted_objects = [] for obj in source_list: - formatted_object = formatter(repo, obj) + formatted_object = formatter(obj) if formatted_object: formatted_objects.append(formatted_object) if len(formatted_objects) >= packet_size: sender(formatted_objects) formatted_objects = [] sender(formatted_objects) -def send_contents(content_list): - """Actually send properly formatted contents to the database""" - logging.info("Sending %d contents" % len(content_list)) - s = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-git/test') - - s.content_add(content_list) - logging.info("Done sending %d contents" % len(content_list)) - - -def send_directories(directory_list): - """Actually send properly formatted directories to the database""" - logging.info("Sending %d directories" % len(directory_list)) - s = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-git/test') - - s.directory_add(directory_list) - logging.info("Done sending %d directories" % len(directory_list)) - - -def send_revisions(revision_list): - """Actually send properly formatted revisions to the database""" - logging.info("Sending %d revisions" % len(revision_list)) - s = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-git/test') - - s.revision_add(revision_list) - logging.info("Done sending %d revisions" % len(revision_list)) - - -def send_releases(release_list): - """Actually send properly formatted releases to the database""" - logging.info("Sending %d releases" % len(release_list)) - s = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-git/test') - - s.release_add(release_list) - logging.info("Done sending %d releases" % len(release_list)) - - -def send_occurrences(occurrence_list): - """Actually send properly formatted occurrences to the database""" - logging.info("Sending %d occurrences" % len(occurrence_list)) - s = Storage('dbname=softwareheritage-dev', '/tmp/swh-loader-git/test') - - s.occurrence_add(occurrence_list) - logging.info("Done sending %d occurrences" % len(occurrence_list)) - - -def blob_to_content(repo, id): - """Format a blob as a content""" - blob = repo[id] - data = blob.data - hashes = hashutil.hashdata(data, HASH_ALGORITHMS) - return { - 'sha1_git': id.raw, - 'sha1': hashes['sha1'], - 'sha256': hashes['sha256'], - 'data': data, - 'length': blob.size, - } - - -def tree_to_directory(repo, id): - """Format a tree as a directory""" - ret = { - 'id': id.raw, - } - entries = [] - ret['entries'] = entries - - entry_type_map = { - 'tree': 'dir', - 'blob': 'file', - 'commit': 'rev', - } - - for entry in repo[id]: - entries.append({ - 'type': entry_type_map[entry.type], - 'perms': entry.filemode, - 'name': entry.name, - 'target': entry.id.raw, - 'atime': None, - 'mtime': None, - 'ctime': None, - }) - - return ret - - -def commit_to_revision(repo, id): - """Format a commit as a revision""" - commit = repo[id] - - author = commit.author - committer = commit.committer - return { - 'id': id.raw, - 'date': format_date(author), - 'date_offset': author.offset, - 'committer_date': format_date(committer), - 'committer_date_offset': committer.offset, - 'type': 'git', - 'directory': commit.tree_id.raw, - 'message': commit.raw_message, - 'author_name': author.name, - 'author_email': author.email, - 'committer_name': committer.name, - 'committer_email': committer.email, - 'parents': [p.raw for p in commit.parent_ids], - } - - -def annotated_tag_to_release(repo, id): - """Format an annotated tag as a release""" - tag = repo[id] - - tag_pointer = repo[tag.target] - if tag_pointer.type != GIT_OBJ_COMMIT: - logging.warn("Ignoring tag %s pointing at %s %s" % ( - tag.id.hex, tag_pointer.__class__.__name__, tag_pointer.id.hex)) - return - - author = tag.tagger - - if not author: - logging.warn("Tag %s has no author, using default values" % id.hex) - author_name = '' - author_email = '' - date = None - date_offset = 0 - else: - author_name = author.name - author_email = author.email - date = format_date(author) - date_offset = author.offset - - return { - 'id': id.raw, - 'date': date, - 'date_offset': date_offset, - 'revision': tag.target.raw, - 'comment': tag.message.encode('utf-8'), - 'author_name': author_name, - 'author_email': author_email, - } - - -def ref_to_occurrence(repo, ref): - """Format a reference as an occurrence""" - return ref - - -def bulk_send_blobs(repo, blob_dict): - """Format blobs as swh contents and send them to the database in bulks - of maximum `threshold` objects - - """ - # TODO: move to config file - content_packet_size = 100000 - - send_in_packets(repo, blob_dict, blob_to_content, send_contents, content_packet_size) - - -def bulk_send_trees(repo, tree_dict): - """Format trees as swh directories and send them to the database - - """ - # TODO: move to config file - directory_packet_size = 25000 - - send_in_packets(repo, tree_dict, tree_to_directory, send_directories, directory_packet_size) - - -def bulk_send_commits(repo, commit_dict): - """Format commits as swh revisions and send them to the database - - """ - # TODO: move to config file - revision_packet_size = 100000 - - send_in_packets(repo, commit_dict, commit_to_revision, send_revisions, revision_packet_size) - - -def bulk_send_annotated_tags(repo, tag_dict): - """Format annotated tags (pygit2.Tag objects) as swh releases and send - them to the database - - """ - # TODO: move to config file - release_packet_size = 100000 - - send_in_packets(repo, tag_dict, annotated_tag_to_release, send_releases, release_packet_size) - - -def bulk_send_refs(repo, refs): - """Format git references as swh occurrences and send them to the database - """ - # TODO: move to config file - occurrence_packet_size = 100000 - - send_in_packets(repo, refs, ref_to_occurrence, - send_occurrences, occurrence_packet_size) - - -def parse_via_object_list(repo_path): - logging.info("Started loading %s" % repo_path) - repo = pygit2.Repository(repo_path) - objects_per_object_type = get_objects_per_object_type(repo) - - refs = [] - ref_names = repo.listall_references() - for ref_name in ref_names: - ref = repo.lookup_reference(ref_name) - target = ref.target - - if not isinstance(target, Oid): - logging.info("Skipping symbolic ref %s pointing at %s" % ( - ref_name, ref.target)) - continue - - target_obj = repo[target] - - if not target_obj.type == GIT_OBJ_COMMIT: - logging.info("Skipping ref %s pointing to %s %s" % ( - ref_name, target_obj.__class__.__name__, target.hex)) - - refs.append({ - 'name': ref_name, - 'revision': target.hex, - }) - - logging.info("Done listing the objects in %s: will load %d contents, " - "%d directories, %d revisions, %d releases, " - "%d occurrences" % ( - repo_path, - len(objects_per_object_type[GIT_OBJ_BLOB]), - len(objects_per_object_type[GIT_OBJ_TREE]), - len(objects_per_object_type[GIT_OBJ_COMMIT]), - len(objects_per_object_type[GIT_OBJ_TAG]), - len(refs) - )) - - bulk_send_blobs(repo, objects_per_object_type[GIT_OBJ_BLOB]) - bulk_send_trees(repo, objects_per_object_type[GIT_OBJ_TREE]) - bulk_send_commits(repo, objects_per_object_type[GIT_OBJ_COMMIT]) - bulk_send_annotated_tags(repo, objects_per_object_type[GIT_OBJ_TAG]) - bulk_send_refs(repo, refs) +class BulkLoader: + """A bulk loader for a git repository""" + def __init__(self, config, storage_class=Storage): + self.config = config + self.storage = storage_class(config['db'], config['storage_base']) + + self.repo = pygit2.Repository(config['repo_path']) + + self.log = logging + + def send_contents(self, content_list): + """Actually send properly formatted contents to the database""" + self.log.info("Sending %d contents" % len(content_list)) + self.storage.content_add(content_list) + self.log.info("Done sending %d contents" % len(content_list)) + + def send_directories(self, directory_list): + """Actually send properly formatted directories to the database""" + self.log.info("Sending %d directories" % len(directory_list)) + self.storage.directory_add(directory_list) + self.log.info("Done sending %d directories" % len(directory_list)) + + def send_revisions(self, revision_list): + """Actually send properly formatted revisions to the database""" + self.log.info("Sending %d revisions" % len(revision_list)) + self.storage.revision_add(revision_list) + self.log.info("Done sending %d revisions" % len(revision_list)) + + def send_releases(self, release_list): + """Actually send properly formatted releases to the database""" + self.log.info("Sending %d releases" % len(release_list)) + self.storage.release_add(release_list) + self.log.info("Done sending %d releases" % len(release_list)) + + def send_occurrences(self, occurrence_list): + """Actually send properly formatted occurrences to the database""" + self.log.info("Sending %d occurrences" % len(occurrence_list)) + self.storage.occurrence_add(occurrence_list) + self.log.info("Done sending %d occurrences" % len(occurrence_list)) + + def blob_to_content(self, id): + """Format a blob as a content""" + blob = self.repo[id] + data = blob.data + hashes = hashutil.hashdata(data, HASH_ALGORITHMS) + return { + 'sha1_git': id.raw, + 'sha1': hashes['sha1'], + 'sha256': hashes['sha256'], + 'data': data, + 'length': blob.size, + } + + def tree_to_directory(self, id): + """Format a tree as a directory""" + ret = { + 'id': id.raw, + } + entries = [] + ret['entries'] = entries + + entry_type_map = { + 'tree': 'dir', + 'blob': 'file', + 'commit': 'rev', + } + + for entry in self.repo[id]: + entries.append({ + 'type': entry_type_map[entry.type], + 'perms': entry.filemode, + 'name': entry.name, + 'target': entry.id.raw, + 'atime': None, + 'mtime': None, + 'ctime': None, + }) + + return ret + + def commit_to_revision(self, id): + """Format a commit as a revision""" + commit = self.repo[id] + + author = commit.author + committer = commit.committer + return { + 'id': id.raw, + 'date': format_date(author), + 'date_offset': author.offset, + 'committer_date': format_date(committer), + 'committer_date_offset': committer.offset, + 'type': 'git', + 'directory': commit.tree_id.raw, + 'message': commit.raw_message, + 'author_name': author.name, + 'author_email': author.email, + 'committer_name': committer.name, + 'committer_email': committer.email, + 'parents': [p.raw for p in commit.parent_ids], + } + + def annotated_tag_to_release(self, id): + """Format an annotated tag as a release""" + tag = self.repo[id] + + tag_pointer = self.repo[tag.target] + if tag_pointer.type != GIT_OBJ_COMMIT: + self.log.warn("Ignoring tag %s pointing at %s %s" % ( + tag.id.hex, tag_pointer.__class__.__name__, + tag_pointer.id.hex)) + return + + author = tag.tagger + + if not author: + self.log.warn("Tag %s has no author, using default values" + % id.hex) + author_name = '' + author_email = '' + date = None + date_offset = 0 + else: + author_name = author.name + author_email = author.email + date = format_date(author) + date_offset = author.offset + + return { + 'id': id.raw, + 'date': date, + 'date_offset': date_offset, + 'revision': tag.target.raw, + 'comment': tag.message.encode('utf-8'), + 'author_name': author_name, + 'author_email': author_email, + } + + def ref_to_occurrence(self, ref): + """Format a reference as an occurrence""" + ref = ref.copy() + + ref.update(origin=self.origin, authority=self.config['authority'], + validity=self.config['validity']) + + return ref + + def create_origin(self): + origin = { + 'type': 'git', + 'url': 'file://%s' % self.config['repo_path'], + } + + id = self.storage.origin_get(origin) + if not id: + id = self.storage.origin_add_one(origin) + + self.origin = id + + def bulk_send_blobs(self, blob_dict): + """Format blobs as swh contents and send them to the database""" + packet_size = self.config['content_packet_size'] + + send_in_packets(blob_dict, self.blob_to_content, + self.send_contents, packet_size) + + def bulk_send_trees(self, tree_dict): + """Format trees as swh directories and send them to the database""" + packet_size = self.config['directory_packet_size'] + + send_in_packets(tree_dict, self.tree_to_directory, + self.send_directories, packet_size) + + def bulk_send_commits(self, commit_dict): + """Format commits as swh revisions and send them to the database""" + packet_size = self.config['revision_packet_size'] + + send_in_packets(commit_dict, self.commit_to_revision, + self.send_revisions, packet_size) + + def bulk_send_annotated_tags(self, tag_dict): + """Format annotated tags (pygit2.Tag objects) as swh releases and send + them to the database + """ + packet_size = self.config['release_packet_size'] + + send_in_packets(tag_dict, self.annotated_tag_to_release, + self.send_releases, packet_size) + + def bulk_send_refs(self, refs): + """Format git references as swh occurrences and send them to the + database + """ + packet_size = self.config['occurrence_packet_size'] + + send_in_packets(refs, self.ref_to_occurrence, + self.send_occurrences, packet_size) + + def list_repo(self): + self.log.info("Started listing %s" % self.config['repo_path']) + self.objects = get_objects_per_object_type(self.repo) + + refs = [] + ref_names = self.repo.listall_references() + for ref_name in ref_names: + ref = self.repo.lookup_reference(ref_name) + target = ref.target + + if not isinstance(target, Oid): + self.log.debug("Peeling symbolic ref %s pointing at %s" % ( + ref_name, ref.target)) + target_obj = ref.peel() + else: + target_obj = self.repo[target] + + if target_obj.type == GIT_OBJ_TAG: + self.log.debug("Peeling ref %s pointing at tag %s" % ( + ref_name, target_obj.name)) + target_obj = ref.peel() + + if not target_obj.type == GIT_OBJ_COMMIT: + self.log.info("Skipping ref %s pointing to %s %s" % ( + ref_name, target_obj.__class__.__name__, + target_obj.id.hex)) + + refs.append({ + 'branch': ref_name, + 'revision': target_obj.id.raw, + }) + + self.objects['refs'] = refs + + self.log.info("Done listing the objects in %s: %d contents, " + "%d directories, %d revisions, %d releases, " + "%d occurrences" % ( + self.config['repo_path'], + len(self.objects[GIT_OBJ_BLOB]), + len(self.objects[GIT_OBJ_TREE]), + len(self.objects[GIT_OBJ_COMMIT]), + len(self.objects[GIT_OBJ_TAG]), + len(self.objects['refs']) + )) + + def load_repo(self): + if self.config['create_origin']: + self.create_origin() + else: + self.log.info('Not creating origin, pulling id from config') + self.origin = self.config['origin'] + + if self.config['send_contents']: + self.bulk_send_blobs(self.objects[GIT_OBJ_BLOB]) + else: + self.log.info('Not sending contents') + + if self.config['send_directories']: + self.bulk_send_trees(self.objects[GIT_OBJ_TREE]) + else: + self.log.info('Not sending directories') + + if self.config['send_revisions']: + self.bulk_send_commits(self.objects[GIT_OBJ_COMMIT]) + else: + self.log.info('Not sending revisions') + + if self.config['send_releases']: + self.bulk_send_annotated_tags(self.objects[GIT_OBJ_TAG]) + else: + self.log.info('Not sending releases') + + if self.config['send_occurrences']: + self.bulk_send_refs(self.objects['refs']) + else: + self.log.info('Not sending occurrences') + + def process(self): + self.list_repo() + self.load_repo()