diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index ef3cc4b..472668b 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,319 +1,341 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert pygit2 objects to dictionaries suitable for swh.storage""" from pygit2 import GIT_OBJ_COMMIT from swh.core import hashutil from .utils import format_date HASH_ALGORITHMS = ['sha1', 'sha256'] def blob_to_content(id, repo, log=None, max_content_size=None, origin_id=None): """Format a blob as a content""" blob = repo[id] size = blob.size ret = { 'sha1_git': id.raw, 'length': blob.size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (id.hex, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_repo': repo.path, 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.data hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def tree_to_directory(id, repo, log=None): """Format a tree as a directory""" ret = { 'id': id.raw, } entries = [] ret['entries'] = entries entry_type_map = { 'tree': 'dir', 'blob': 'file', 'commit': 'rev', } for entry in repo[id]: entries.append({ 'type': entry_type_map[entry.type], 'perms': entry.filemode, 'name': entry._name, 'target': entry.id.raw, }) return ret def commit_to_revision(id, repo, log=None): """Format a commit as a revision""" commit = repo[id] author = commit.author committer = commit.committer return { 'id': id.raw, 'date': format_date(author), 'committer_date': format_date(committer), 'type': 'git', 'directory': commit.tree_id.raw, 'message': commit.raw_message, 'metadata': None, 'author': { 'name': author.raw_name, 'email': author.raw_email, }, 'committer': { 'name': committer.raw_name, 'email': committer.raw_email, }, 'synthetic': False, 'parents': [p.raw for p in commit.parent_ids], } def annotated_tag_to_release(id, repo, log=None): """Format an annotated tag as a release""" tag = repo[id] tag_pointer = repo[tag.target] if tag_pointer.type != GIT_OBJ_COMMIT: if log: log.warn("Ignoring tag %s pointing at %s %s" % ( tag.id.hex, tag_pointer.__class__.__name__, tag_pointer.id.hex), extra={ 'swh_type': 'loader_git_tag_ignore', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, 'swh_tag_dest': { 'type': tag_pointer.__class__.__name__, 'id': tag_pointer.id.hex, }, }) return if not tag.tagger: if log: log.warn("Tag %s has no author, using default values" % id.hex, extra={ 'swh_type': 'loader_git_tag_author_default', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, }) author = None date = None else: author = { 'name': tag.tagger.raw_name, 'email': tag.tagger.raw_email, } date = format_date(tag.tagger) return { 'id': id.raw, 'date': date, 'target': tag.target.raw, 'target_type': 'revision', 'message': tag._message, 'name': tag.name.raw, 'author': author, 'metadata': None, 'synthetic': False, } def ref_to_occurrence(ref): """Format a reference as an occurrence""" occ = ref.copy() if 'branch' in ref: branch = ref['branch'] if isinstance(branch, str): occ['branch'] = branch.encode('utf-8') else: occ['branch'] = branch return occ def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" + if blob.type_name != b'blob': + return + size = blob.raw_length() ret = { 'sha1_git': blob.sha().digest(), 'length': size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (blob.id.encode(), size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" + if tree.type_name != b'tree': + return + ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ - 'type': entry_mode_map[entry.mode], + 'type': entry_mode_map.get(entry.mode, 'file'), 'perms': entry.mode, 'name': entry.path, 'target': hashutil.hex_to_hash(entry.sha.decode('ascii')), }) return ret def parse_author(name_email): """Parse an author line""" + if not name_email: + return None + name, email = name_email.split(b' <', 1) email = email[:-1] return { 'name': name, 'email': email, } def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return { 'timestamp': timestamp, - 'offset': timezone, + 'offset': timezone // 60, 'negative_utc': timezone_neg_utc if timezone == 0 else None, } def dulwich_commit_to_revision(commit, log=None): + if commit.type_name != b'commit': + return + ret = { 'id': commit.sha().digest(), 'author': parse_author(commit.author), 'date': dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), 'committer': parse_author(commit.committer), 'committer_date': dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), 'type': 'git', 'directory': bytes.fromhex(commit.tree.decode()), 'message': commit.message, 'metadata': None, 'synthetic': False, 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], } + git_metadata = [] + if commit.mergetag: + for mergetag in commit.mergetag: + git_metadata.append(['mergetag', mergetag.as_raw_string()]) + + if commit.extra: + git_metadata.extend([k, v] for k, v in commit.extra) + + if commit.gpgsig: + git_metadata.append(['gpgsig', commit.gpgsig]) + + if git_metadata: + ret['metadata'] = { + 'extra_git_headers': git_metadata, + } + return ret DULWICH_TYPES = { b'blob': 'content', b'tree': 'directory', b'commit': 'revision', b'tag': 'release', } -def dulwich_tag_to_revision(tag, log=None): - target, target_type = tag.object +def dulwich_tag_to_release(tag, log=None): + if tag.type_name != b'tag': + return + + target_type, target = tag.object ret = { 'id': tag.sha().digest(), 'name': tag.name, - 'author': parse_author(tag.tagger), - 'date': dulwich_tsinfo_to_timestamp( - tag.tag_time, - tag.tag_timezone, - tag._tag_timezone_neg_utc, - ), 'target': bytes.fromhex(target.decode()), - 'target_type': DULWICH_TYPES[target_type], + 'target_type': DULWICH_TYPES[target_type.type_name], 'message': tag._message, 'metadata': None, 'synthetic': False, } + if tag.tagger: + ret['author'] = parse_author(tag.tagger) + ret['date'] = dulwich_tsinfo_to_timestamp( + tag.tag_time, + tag.tag_timezone, + tag._tag_timezone_neg_utc, + ) + else: + ret['author'] = ret['date'] = None return ret - - -def dulwich_ref_to_occurrence(ref, objects, timestamp, log=None): - pass - - -DULWICH_CONVERTERS = { - b'blob': dulwich_blob_to_content, - b'tree': dulwich_tree_to_directory, - b'commit': dulwich_commit_to_revision, - b'tag': lambda x: x, -} diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index ee20e9f..28bf644 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,296 +1,651 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from io import BytesIO import logging import sys +import traceback +import uuid from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater +import psycopg2 +import requests +from retrying import retry from urllib.parse import urlparse from swh.core import config, hashutil from swh.storage import get_storage from . import converters -class BulkUpdater(config.SWHConfig): - """A bulk loader for a git repository""" - CONFIG_BASE_FILENAME = 'updater.ini' - - DEFAULT_CONFIG = { - 'storage_class': ('str', 'remote_storage'), - 'storage_args': ('list[str]', ['http://localhost:5000/']), - - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), - - 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), - } - - def __init__(self, config): - self.config = config - self.storage = get_storage(config['storage_class'], - config['storage_args']) - self.log = logging.getLogger('swh.loader.git.BulkLoader') - - class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" - def __init__(self, storage, origin_url): + def __init__(self, storage, origin_id): self.storage = storage self._parents_cache = {} - self._refs_cache = None + self._type_cache = {} - origin = storage.origin_get({'url': origin_url, 'type': 'git'}) - if origin: - origin_id = origin['id'] + if origin_id: self.heads = self._cache_heads(origin_id) else: self.heads = [] def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" - root_rev = hashutil.hex_to_hash(commit.decode()) + root_rev = hashutil.bytehex_to_hash(commit) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id): """Return all the known head commits for `origin_id`""" return [ hashutil.hash_to_bytehex(revision['id']) for revision in self.storage.revision_get_by( origin_id, branch_name=None, timestamp=None, limit=None) ] def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): - return [hashutil.hex_to_hash(object.decode()) for object in objects] + return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): - return set(hashutil.hash_to_hex(object).encode() for object in objects) + return set(hashutil.hash_to_bytehex(object) for object in objects) def get_stored_commits(self, commits): return commits - self._decode_from_storage( self.storage.revision_missing( self._encode_for_storage(commits) ) ) def get_stored_tags(self, tags): return tags - self._decode_from_storage( self.storage.release_missing( self._encode_for_storage(tags) ) ) def get_stored_trees(self, trees): return trees - self._decode_from_storage( self.storage.directory_missing( self._encode_for_storage(trees) ) ) def get_stored_blobs(self, blobs): - return set() + ret = set() + for blob in blobs: + if self.storage.content_find({ + 'sha1_git': hashutil.bytehex_to_hash(blob), + }): + ret.add(blob) + + return ret def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): - refs = self.parse_refs(refs) + if not refs: + return [] + refs = self.parse_local_refs(refs) ret = set() for ref, target in self.filter_unwanted_refs(refs).items(): if target['target_type'] is None: # The target doesn't exist in Software Heritage ret.add(target['target']) return list(ret) - def parse_refs(self, remote_refs): + def parse_local_refs(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage""" - if self._refs_cache is not None: - return self._refs_cache - all_objs = set(remote_refs.values()) - type_by_id = defaultdict(None) + all_objs = set(remote_refs.values()) - set(self._type_cache) + type_by_id = {} tags = self.get_stored_tags(all_objs) all_objs -= tags for tag in tags: type_by_id[tag] = 'release' commits = self.get_stored_commits(all_objs) all_objs -= commits for commit in commits: type_by_id[commit] = 'revision' trees = self.get_stored_trees(all_objs) all_objs -= trees for tree in trees: type_by_id[tree] = 'directory' blobs = self.get_stored_blobs(all_objs) all_objs -= blobs for blob in blobs: type_by_id[blob] = 'content' + self._type_cache.update(type_by_id) + ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, - 'target_type': type_by_id.get(id), + 'target_type': self._type_cache.get(id), } + return ret - self._refs_cache = ret - return ret +def send_in_packets(source_list, formatter, sender, packet_size, + packet_size_bytes=None, *args, **kwargs): + """Send objects from `source_list`, passed through `formatter` (with + extra args *args, **kwargs), using the `sender`, in packets of + `packet_size` objects (and of max `packet_size_bytes`). + + """ + formatted_objects = [] + count = 0 + if not packet_size_bytes: + packet_size_bytes = 0 + for obj in source_list: + formatted_object = formatter(obj, *args, **kwargs) + if formatted_object: + formatted_objects.append(formatted_object) + else: + continue + if packet_size_bytes: + count += formatted_object['length'] + if len(formatted_objects) >= packet_size or count > packet_size_bytes: + sender(formatted_objects) + formatted_objects = [] + count = 0 + + if formatted_objects: + sender(formatted_objects) + + +def retry_loading(error): + """Retry policy when we catch a recoverable error""" + exception_classes = [ + # raised when two parallel insertions insert the same data. + psycopg2.IntegrityError, + # raised when uWSGI restarts and hungs up on the worker. + requests.exceptions.ConnectionError, + ] + + if not any(isinstance(error, exc) for exc in exception_classes): + return False + + logger = logging.getLogger('swh.loader.git.BulkLoader') + + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry loading a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + + return True + + +class BulkUpdater(config.SWHConfig): + """A bulk loader for a git repository""" + CONFIG_BASE_FILENAME = 'loader/git/updater.ini' + DEFAULT_CONFIG = { + 'storage_class': ('str', 'remote_storage'), + 'storage_args': ('list[str]', ['http://localhost:5000/']), -def fetch_pack_from_origin(storage, origin_url, base_url, activity_buffer): - """Fetch a pack from the origin""" - pack_buffer = BytesIO() + 'send_contents': ('bool', True), + 'send_directories': ('bool', True), + 'send_revisions': ('bool', True), + 'send_releases': ('bool', True), + 'send_occurrences': ('bool', True), - base_repo = SWHRepoRepresentation(storage, base_url) + 'content_packet_size': ('int', 10000), + 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), + 'directory_packet_size': ('int', 25000), + 'revision_packet_size': ('int', 100000), + 'release_packet_size': ('int', 100000), + 'occurrence_packet_size': ('int', 100000), + } - parsed_uri = urlparse(origin_url) + def __init__(self, config): + self.config = config + self.storage = get_storage(config['storage_class'], + config['storage_args']) + self.log = logging.getLogger('swh.loader.git.BulkLoader') - path = parsed_uri.path - if not path.endswith('.git'): - path += '.git' + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_contents(self, content_list): + """Actually send properly formatted contents to the database""" + num_contents = len(content_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + self.storage.content_add(content_list) + self.log.debug("Done sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_directories(self, directory_list): + """Actually send properly formatted directories to the database""" + num_directories = len(directory_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + self.storage.directory_add(directory_list) + self.log.debug("Done sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_revisions(self, revision_list): + """Actually send properly formatted revisions to the database""" + num_revisions = len(revision_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + self.storage.revision_add(revision_list) + self.log.debug("Done sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_releases(self, release_list): + """Actually send properly formatted releases to the database""" + num_releases = len(release_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + self.storage.release_add(release_list) + self.log.debug("Done sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_occurrences(self, occurrence_list): + """Actually send properly formatted occurrences to the database""" + num_occurrences = len(occurrence_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + self.storage.occurrence_add(occurrence_list) + self.log.debug("Done sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + + def fetch_pack_from_origin(self, origin_url, base_origin_id, do_activity): + """Fetch a pack from the origin""" + pack_buffer = BytesIO() + + base_repo = SWHRepoRepresentation(self.storage, base_origin_id) + + parsed_uri = urlparse(origin_url) + + path = parsed_uri.path + if not path.endswith('.git'): + path += '.git' + + client = dulwich.client.TCPGitClient(parsed_uri.netloc, + thin_packs=False) + + def do_pack(data, pack_buffer=pack_buffer): + pack_buffer.write(data) + + remote_refs = client.fetch_pack(path.encode('ascii'), + base_repo.determine_wants, + base_repo.graph_walker(), + do_pack, + progress=do_activity) + + if remote_refs: + local_refs = base_repo.parse_local_refs(remote_refs) + else: + local_refs = remote_refs = {} + + pack_buffer.flush() + pack_size = pack_buffer.tell() + pack_buffer.seek(0) + return { + 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), + 'local_refs': local_refs, + 'pack_buffer': pack_buffer, + 'pack_size': pack_size, + } + + def get_origin(self, origin_url): + origin = converters.origin_url_to_origin(origin_url) + + return self.storage.origin_get(origin) + + def get_or_create_origin(self, origin_url): + origin = converters.origin_url_to_origin(origin_url) + + origin['id'] = self.storage.origin_add_one(origin) + + return origin + + def create_origin(self, origin_url): + log_id = str(uuid.uuid4()) + self.log.debug('Creating origin for %s' % origin_url, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'origin', + 'swh_num': 1, + 'swh_id': log_id + }) + origin = self.get_or_create_origin(origin_url) + self.log.debug('Done creating origin for %s' % origin_url, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'origin', + 'swh_num': 1, + 'swh_id': log_id + }) + + return origin + + def bulk_send_blobs(self, inflater, origin_id): + """Format blobs as swh contents and send them to the database""" + packet_size = self.config['content_packet_size'] + packet_size_bytes = self.config['content_packet_size_bytes'] + max_content_size = self.config['content_size_limit'] + + send_in_packets(inflater, converters.dulwich_blob_to_content, + self.send_contents, packet_size, + packet_size_bytes=packet_size_bytes, + log=self.log, max_content_size=max_content_size, + origin_id=origin_id) + + def bulk_send_trees(self, inflater): + """Format trees as swh directories and send them to the database""" + packet_size = self.config['directory_packet_size'] + + send_in_packets(inflater, converters.dulwich_tree_to_directory, + self.send_directories, packet_size, + log=self.log) + + def bulk_send_commits(self, inflater): + """Format commits as swh revisions and send them to the database""" + packet_size = self.config['revision_packet_size'] + + send_in_packets(inflater, converters.dulwich_commit_to_revision, + self.send_revisions, packet_size, + log=self.log) + + def bulk_send_tags(self, inflater): + """Format annotated tags (dulwich.objects.Tag objects) as swh releases and send + them to the database + """ + packet_size = self.config['release_packet_size'] + + send_in_packets(inflater, converters.dulwich_tag_to_release, + self.send_releases, packet_size, + log=self.log) + + def bulk_send_refs(self, refs): + """Format git references as swh occurrences and send them to the + database + """ + packet_size = self.config['occurrence_packet_size'] + + send_in_packets(refs, lambda x: x, self.send_occurrences, packet_size) + + def open_fetch_history(self, origin_id): + return self.storage.fetch_history_start(origin_id) + + def close_fetch_history_success(self, fetch_history_id, objects, refs): + data = { + 'status': True, + 'result': { + 'contents': len(objects[b'blob']), + 'directories': len(objects[b'tree']), + 'revisions': len(objects[b'commit']), + 'releases': len(objects[b'tag']), + 'occurrences': len(refs), + }, + } + return self.storage.fetch_history_end(fetch_history_id, data) + + def close_fetch_history_failure(self, fetch_history_id): + import traceback + data = { + 'status': False, + 'stderr': traceback.format_exc(), + } + return self.storage.fetch_history_end(fetch_history_id, data) + + def get_inflater(self, pack_buffer, pack_size): + """Reset the pack buffer and get an object inflater from it""" + pack_buffer.seek(0) + return PackInflater.for_pack_data( + PackData.from_file(pack_buffer, pack_size)) + + def list_pack(self, pack_data, pack_size): + id_to_type = {} + type_to_ids = defaultdict(set) + + inflater = self.get_inflater(pack_data, pack_size) + + for obj in inflater: + type, id = obj.type_name, obj.id + id_to_type[id] = type + type_to_ids[type].add(id) + + return id_to_type, type_to_ids + + def list_refs(self, remote_refs, local_refs, id_to_type, origin_id, date): + ret = [] + for ref in remote_refs: + ret_ref = local_refs[ref].copy() + ret_ref.update({ + 'branch': ref, + 'origin': origin_id, + 'date': date, + }) + if not ret_ref['target_type']: + target_type = id_to_type[ret_ref['target']] + ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] + + ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) + + ret.append(ret_ref) - client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) + return ret - def do_pack(data, pack_buffer=pack_buffer): - pack_buffer.write(data) + def load_pack(self, pack_buffer, pack_size, refs, origin_id): + if self.config['send_contents']: + self.bulk_send_blobs(self.get_inflater(pack_buffer, pack_size), + origin_id) + else: + self.log.info('Not sending contents') - def do_activity(data, activity_buffer=activity_buffer): - activity_buffer.write(data) - activity_buffer.flush() + if self.config['send_directories']: + self.bulk_send_trees(self.get_inflater(pack_buffer, pack_size)) + else: + self.log.info('Not sending directories') - remote_refs = client.fetch_pack(path.encode('ascii'), - base_repo.determine_wants, - base_repo.graph_walker(), - do_pack, - progress=do_activity) + if self.config['send_revisions']: + self.bulk_send_commits(self.get_inflater(pack_buffer, pack_size)) + else: + self.log.info('Not sending revisions') - local_refs = base_repo.parse_refs(remote_refs) + if self.config['send_releases']: + self.bulk_send_tags(self.get_inflater(pack_buffer, pack_size)) + else: + self.log.info('Not sending releases') - pack_buffer.flush() - pack_size = pack_buffer.tell() - pack_buffer.seek(0) - return { - 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), - 'local_refs': local_refs, - 'pack_buffer': pack_buffer, - 'pack_size': pack_size, - } + if self.config['send_occurrences']: + self.bulk_send_refs(refs) + else: + self.log.info('Not sending occurrences') + + def process(self, origin_url, base_url): + date = datetime.datetime.now(tz=datetime.timezone.utc) + + # Add origin to storage if needed, use the one from config if not + origin = self.create_origin(origin_url) + base_origin = self.get_origin(base_url) + + # Create fetch_history + fetch_history = self.open_fetch_history(origin['id']) + closed = False + + def do_progress(msg): + sys.stderr.buffer.write(msg) + sys.stderr.flush() + + try: + fetch_info = self.fetch_pack_from_origin( + origin_url, base_origin['id'], do_progress) + + pack_buffer = fetch_info['pack_buffer'] + pack_size = fetch_info['pack_size'] + + remote_refs = fetch_info['remote_refs'] + local_refs = fetch_info['local_refs'] + if not remote_refs: + self.log.info('Skipping empty repository %s' % origin_url, + extra={ + 'swh_type': 'git_repo_list_refs', + 'swh_repo': origin_url, + 'swh_num_refs': 0, + }) + # End fetch_history + self.close_fetch_history_success(fetch_history, + defaultdict(set), []) + closed = True + return + else: + self.log.info('Listed %d refs for repo %s' % ( + len(remote_refs), origin_url), extra={ + 'swh_type': 'git_repo_list_refs', + 'swh_repo': origin_url, + 'swh_num_refs': len(remote_refs), + }) + # We want to load the repository, walk all the objects + id_to_type, type_to_ids = self.list_pack(pack_buffer, pack_size) -def refs_to_occurrences(remote_refs, local_refs, types_per_id, origin_id, - timestamp): - """Merge references remote and local""" - ret = {} - for ref, data in remote_refs.items(): - ret[ref] = local_refs[ref].copy() - ret[ref].update({ - 'branch': ref, - 'origin': origin_id, - 'validity': timestamp, - }) - if not ret[ref]['target_type']: - target_type = types_per_id[remote_refs[ref]] - ret[ref]['target_type'] = converters.DULWICH_TYPES[target_type] + # Parse the remote references and add info from the local ones + refs = self.list_refs(remote_refs, local_refs, + id_to_type, origin['id'], date) - return ret + # Finally, load the repository + self.load_pack(pack_buffer, pack_size, refs, origin['id']) + + # End fetch_history + self.close_fetch_history_success(fetch_history, type_to_ids, refs) + closed = True + + finally: + if not closed: + self.close_fetch_history_failure(fetch_history) if __name__ == '__main__': + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(process)d %(message)s' + ) config = BulkUpdater.parse_config_file( - base_filename='updater.ini' + base_filename='loader/git/updater.ini' ) bulkupdater = BulkUpdater(config) origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] - fetch_info = fetch_pack_from_origin( - bulkupdater.storage, origin_url, base_url, sys.stderr.buffer) - - pack_data = PackInflater.for_pack_data( - PackData.from_file(fetch_info['pack_buffer'], fetch_info['pack_size'])) - - objs_per_type = defaultdict(list) - types_per_id = {} - for obj in pack_data: - obj_type = obj.type_name - types_per_id[obj.id] = obj_type - conv = converters.DULWICH_CONVERTERS[obj_type] - objs_per_type[obj_type].append(conv(obj)) - print([(k, len(l)) for k, l in sorted(objs_per_type.items())], - file=sys.stderr, end='\r') - - remote_refs = fetch_info['remote_refs'] - local_refs = fetch_info['local_refs'] - - origin_id = 42 - now = datetime.datetime.now() - - occurrences = refs_to_occurrences( - remote_refs, local_refs, types_per_id, origin_id, now) - - for branch in occurrences: - print(occurrences[branch]) - objs_per_type[b'refs'] = remote_refs - - print({k: len(l) for k, l in objs_per_type.items()}) + bulkupdater.process(origin_url, base_url)