diff --git a/.gitignore b/.gitignore index f68ca8a..f91c832 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,12 @@ .eggs/ /sgloader/__pycache__/ /dataset/ *.pyc /.coverage /scratch/swhgitloader.cProfile /scratch/swhgitloader.profile /scratch/save.p *.egg-info version.txt +/resources/repo-linux-to-load.ini +/resources/repo-to-load.ini diff --git a/PKG-INFO b/PKG-INFO index 24dfc16..baf8ab4 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.10 +Version: 0.0.11 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/debian/control b/debian/control index e78ccae..ef9e579 100644 --- a/debian/control +++ b/debian/control @@ -1,25 +1,28 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, + python3-dulwich, python3-pygit2, python3-retrying, python3-setuptools, python3-swh.core (>= 0.0.7~), - python3-swh.model, - python3-swh.storage (>= 0.0.20~), + python3-swh.model (>= 0.0.3~), + python3-swh.scheduler, + python3-swh.storage (>= 0.0.31~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), - python3-swh.storage (>= 0.0.20~), + python3-swh.storage (>= 0.0.31~), + python3-swh.model (>= 0.0.3~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/requirements.txt b/requirements.txt index d7697e0..6723573 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,8 @@ +dulwich pygit2 retrying vcversioner swh.core >= 0.0.7 -swh.model -swh.storage >= 0.0.20 +swh.model >= 0.0.3 +swh.scheduler +swh.storage >= 0.0.31 diff --git a/resources/updater.ini b/resources/updater.ini new file mode 100644 index 0000000..da492be --- /dev/null +++ b/resources/updater.ini @@ -0,0 +1,10 @@ +[main] +# Where to store the logs +log_dir = /tmp/swh-loader-git/log + +# how to access the backend (remote or local) +backend-type = local + +# backend-type remote: url access to api rest's backend +# backend-type local: configuration file to backend file .ini (cf. back.ini file) +backend = ~/.config/swh/back.ini diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index 24dfc16..baf8ab4 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.10 +Version: 0.0.11 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/SOURCES.txt b/swh.loader.git.egg-info/SOURCES.txt index 672be67..149c9a6 100644 --- a/swh.loader.git.egg-info/SOURCES.txt +++ b/swh.loader.git.egg-info/SOURCES.txt @@ -1,38 +1,40 @@ .gitignore .gitmodules AUTHORS LICENSE MANIFEST.in Makefile README requirements.txt setup.py version.txt bin/dir-git-repo-meta.sh bin/swh-loader-git bin/swh-loader-git-multi debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format doc/attic/api-backend-protocol.txt doc/attic/git-loading-design.txt resources/local-loader-git.ini resources/remote-loader-git.ini +resources/updater.ini resources/test/back.ini resources/test/db-manager.ini scratch/analyse-profile.py scratch/repo_walk.py swh.loader.git.egg-info/PKG-INFO swh.loader.git.egg-info/SOURCES.txt swh.loader.git.egg-info/dependency_links.txt swh.loader.git.egg-info/requires.txt swh.loader.git.egg-info/top_level.txt swh/loader/git/__init__.py swh/loader/git/converters.py swh/loader/git/loader.py swh/loader/git/tasks.py +swh/loader/git/updater.py swh/loader/git/utils.py swh/loader/git/tests/test_converters.py \ No newline at end of file diff --git a/swh.loader.git.egg-info/requires.txt b/swh.loader.git.egg-info/requires.txt index 5f1dfe1..d2a2347 100644 --- a/swh.loader.git.egg-info/requires.txt +++ b/swh.loader.git.egg-info/requires.txt @@ -1,6 +1,8 @@ +dulwich pygit2 retrying swh.core>=0.0.7 -swh.model -swh.storage>=0.0.20 +swh.model>=0.0.3 +swh.scheduler +swh.storage>=0.0.31 vcversioner diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index 9136e31..472668b 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,162 +1,341 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert pygit2 objects to dictionaries suitable for swh.storage""" from pygit2 import GIT_OBJ_COMMIT from swh.core import hashutil from .utils import format_date HASH_ALGORITHMS = ['sha1', 'sha256'] def blob_to_content(id, repo, log=None, max_content_size=None, origin_id=None): """Format a blob as a content""" blob = repo[id] size = blob.size ret = { 'sha1_git': id.raw, 'length': blob.size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (id.hex, size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_repo': repo.path, 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.data hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def tree_to_directory(id, repo, log=None): """Format a tree as a directory""" ret = { 'id': id.raw, } entries = [] ret['entries'] = entries entry_type_map = { 'tree': 'dir', 'blob': 'file', 'commit': 'rev', } for entry in repo[id]: entries.append({ 'type': entry_type_map[entry.type], 'perms': entry.filemode, 'name': entry._name, 'target': entry.id.raw, }) return ret def commit_to_revision(id, repo, log=None): """Format a commit as a revision""" commit = repo[id] author = commit.author committer = commit.committer return { 'id': id.raw, 'date': format_date(author), 'committer_date': format_date(committer), 'type': 'git', 'directory': commit.tree_id.raw, 'message': commit.raw_message, 'metadata': None, 'author': { 'name': author.raw_name, 'email': author.raw_email, }, 'committer': { 'name': committer.raw_name, 'email': committer.raw_email, }, 'synthetic': False, 'parents': [p.raw for p in commit.parent_ids], } def annotated_tag_to_release(id, repo, log=None): """Format an annotated tag as a release""" tag = repo[id] tag_pointer = repo[tag.target] if tag_pointer.type != GIT_OBJ_COMMIT: if log: log.warn("Ignoring tag %s pointing at %s %s" % ( tag.id.hex, tag_pointer.__class__.__name__, tag_pointer.id.hex), extra={ 'swh_type': 'loader_git_tag_ignore', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, 'swh_tag_dest': { 'type': tag_pointer.__class__.__name__, 'id': tag_pointer.id.hex, }, }) return if not tag.tagger: if log: log.warn("Tag %s has no author, using default values" % id.hex, extra={ 'swh_type': 'loader_git_tag_author_default', 'swh_repo': repo.path, 'swh_tag_id': tag.id.hex, }) author = None date = None else: author = { 'name': tag.tagger.raw_name, 'email': tag.tagger.raw_email, } date = format_date(tag.tagger) return { 'id': id.raw, 'date': date, 'target': tag.target.raw, 'target_type': 'revision', 'message': tag._message, - 'name': tag.name.encode('utf-8'), + 'name': tag.name.raw, 'author': author, 'metadata': None, 'synthetic': False, } def ref_to_occurrence(ref): """Format a reference as an occurrence""" - return ref + occ = ref.copy() + if 'branch' in ref: + branch = ref['branch'] + if isinstance(branch, str): + occ['branch'] = branch.encode('utf-8') + else: + occ['branch'] = branch + return occ def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } + + +def dulwich_blob_to_content(blob, log=None, max_content_size=None, + origin_id=None): + """Convert a dulwich blob to a Software Heritage content""" + + if blob.type_name != b'blob': + return + + size = blob.raw_length() + + ret = { + 'sha1_git': blob.sha().digest(), + 'length': size, + 'status': 'absent' + } + + if max_content_size: + if size > max_content_size: + if log: + log.info('Skipping content %s, too large (%s > %s)' % + (blob.id.encode(), size, max_content_size), extra={ + 'swh_type': 'loader_git_content_skip', + 'swh_id': id.hex, + 'swh_size': size, + }) + ret['reason'] = 'Content too large' + ret['origin'] = origin_id + return ret + + data = blob.as_raw_string() + hashes = hashutil.hashdata(data, HASH_ALGORITHMS) + ret.update(hashes) + ret['data'] = data + ret['status'] = 'visible' + + return ret + + +def dulwich_tree_to_directory(tree, log=None): + """Format a tree as a directory""" + if tree.type_name != b'tree': + return + + ret = { + 'id': tree.sha().digest(), + } + entries = [] + ret['entries'] = entries + + entry_mode_map = { + 0o040000: 'dir', + 0o160000: 'rev', + 0o100644: 'file', + 0o100755: 'file', + 0o120000: 'file', + } + + for entry in tree.iteritems(): + entries.append({ + 'type': entry_mode_map.get(entry.mode, 'file'), + 'perms': entry.mode, + 'name': entry.path, + 'target': hashutil.hex_to_hash(entry.sha.decode('ascii')), + }) + + return ret + + +def parse_author(name_email): + """Parse an author line""" + + if not name_email: + return None + + name, email = name_email.split(b' <', 1) + email = email[:-1] + + return { + 'name': name, + 'email': email, + } + + +def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): + """Convert the dulwich timestamp information to a structure compatible with + Software Heritage""" + return { + 'timestamp': timestamp, + 'offset': timezone // 60, + 'negative_utc': timezone_neg_utc if timezone == 0 else None, + } + + +def dulwich_commit_to_revision(commit, log=None): + if commit.type_name != b'commit': + return + + ret = { + 'id': commit.sha().digest(), + 'author': parse_author(commit.author), + 'date': dulwich_tsinfo_to_timestamp( + commit.author_time, + commit.author_timezone, + commit._author_timezone_neg_utc, + ), + 'committer': parse_author(commit.committer), + 'committer_date': dulwich_tsinfo_to_timestamp( + commit.commit_time, + commit.commit_timezone, + commit._commit_timezone_neg_utc, + ), + 'type': 'git', + 'directory': bytes.fromhex(commit.tree.decode()), + 'message': commit.message, + 'metadata': None, + 'synthetic': False, + 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], + } + + git_metadata = [] + if commit.mergetag: + for mergetag in commit.mergetag: + git_metadata.append(['mergetag', mergetag.as_raw_string()]) + + if commit.extra: + git_metadata.extend([k, v] for k, v in commit.extra) + + if commit.gpgsig: + git_metadata.append(['gpgsig', commit.gpgsig]) + + if git_metadata: + ret['metadata'] = { + 'extra_git_headers': git_metadata, + } + + return ret + + +DULWICH_TYPES = { + b'blob': 'content', + b'tree': 'directory', + b'commit': 'revision', + b'tag': 'release', +} + + +def dulwich_tag_to_release(tag, log=None): + if tag.type_name != b'tag': + return + + target_type, target = tag.object + ret = { + 'id': tag.sha().digest(), + 'name': tag.name, + 'target': bytes.fromhex(target.decode()), + 'target_type': DULWICH_TYPES[target_type.type_name], + 'message': tag._message, + 'metadata': None, + 'synthetic': False, + } + if tag.tagger: + ret['author'] = parse_author(tag.tagger) + ret['date'] = dulwich_tsinfo_to_timestamp( + tag.tag_time, + tag.tag_timezone, + tag._tag_timezone_neg_utc, + ) + else: + ret['author'] = ret['date'] = None + + return ret diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index c9fb7ab..b7fa942 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,502 +1,497 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import traceback import uuid import psycopg2 import pygit2 from pygit2 import Oid, GIT_OBJ_BLOB, GIT_OBJ_TREE, GIT_OBJ_COMMIT, GIT_OBJ_TAG import requests from retrying import retry from swh.core import config +from swh.storage import get_storage from . import converters from .utils import get_objects_per_object_type def send_in_packets(source_list, formatter, sender, packet_size, packet_size_bytes=None, *args, **kwargs): """Send objects from `source_list`, passed through `formatter` (with extra args *args, **kwargs), using the `sender`, in packets of `packet_size` objects (and of max `packet_size_bytes`). """ formatted_objects = [] count = 0 if not packet_size_bytes: packet_size_bytes = 0 for obj in source_list: formatted_object = formatter(obj, *args, **kwargs) if formatted_object: formatted_objects.append(formatted_object) else: continue if packet_size_bytes: count += formatted_object['length'] if len(formatted_objects) >= packet_size or count > packet_size_bytes: sender(formatted_objects) formatted_objects = [] count = 0 if formatted_objects: sender(formatted_objects) def retry_loading(error): """Retry policy when we catch a recoverable error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader.git.BulkLoader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class BulkLoader(config.SWHConfig): """A bulk loader for a git repository""" DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5000/']), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } def __init__(self, config): self.config = config - - if self.config['storage_class'] == 'remote_storage': - from swh.storage.api.client import RemoteStorage as Storage - else: - from swh.storage import Storage - - self.storage = Storage(*self.config['storage_args']) - + self.storage = get_storage(config['storage_class'], + config['storage_args']) self.log = logging.getLogger('swh.loader.git.BulkLoader') @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database""" num_contents = len(content_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database""" num_directories = len(directory_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database""" num_revisions = len(revision_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database""" num_releases = len(release_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database""" num_occurrences = len(occurrence_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) def get_or_create_origin(self, origin_url): origin = converters.origin_url_to_origin(origin_url) origin['id'] = self.storage.origin_add_one(origin) return origin def repo_origin(self, repo, origin_url): log_id = str(uuid.uuid4()) self.log.debug('Creating origin for %s' % origin_url, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin = self.get_or_create_origin(origin_url) self.log.debug('Done creating origin for %s' % origin_url, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin def bulk_send_blobs(self, repo, blobs, origin_id): """Format blobs as swh contents and send them to the database""" packet_size = self.config['content_packet_size'] packet_size_bytes = self.config['content_packet_size_bytes'] max_content_size = self.config['content_size_limit'] send_in_packets(blobs, converters.blob_to_content, self.send_contents, packet_size, repo=repo, packet_size_bytes=packet_size_bytes, log=self.log, max_content_size=max_content_size, origin_id=origin_id) def bulk_send_trees(self, repo, trees): """Format trees as swh directories and send them to the database""" packet_size = self.config['directory_packet_size'] send_in_packets(trees, converters.tree_to_directory, self.send_directories, packet_size, repo=repo, log=self.log) def bulk_send_commits(self, repo, commits): """Format commits as swh revisions and send them to the database""" packet_size = self.config['revision_packet_size'] send_in_packets(commits, converters.commit_to_revision, self.send_revisions, packet_size, repo=repo, log=self.log) def bulk_send_annotated_tags(self, repo, tags): """Format annotated tags (pygit2.Tag objects) as swh releases and send them to the database """ packet_size = self.config['release_packet_size'] send_in_packets(tags, converters.annotated_tag_to_release, self.send_releases, packet_size, repo=repo, log=self.log) def bulk_send_refs(self, repo, refs): """Format git references as swh occurrences and send them to the database """ packet_size = self.config['occurrence_packet_size'] send_in_packets(refs, converters.ref_to_occurrence, self.send_occurrences, packet_size) def list_repo_refs(self, repo, origin_id, authority_id, validity): """List all the refs from the given repository. Args: - repo (pygit2.Repository): the repository to list - origin_id (int): the id of the origin from which the repo is taken - validity (datetime.datetime): the validity date for the repository's refs - authority_id (str): the uuid of the authority on `validity`. Returns: A list of dicts with keys: - branch (str): name of the ref - revision (sha1_git): revision pointed at by the ref - origin (int) - validity (datetime.DateTime) - authority (str) Compatible with occurrence_add. """ log_id = str(uuid.uuid4()) refs = [] ref_names = repo.listall_references() for ref_name in ref_names: ref = repo.lookup_reference(ref_name) target = ref.target if not isinstance(target, Oid): self.log.debug("Peeling symbolic ref %s pointing at %s" % ( ref_name, ref.target), extra={ 'swh_type': 'git_sym_ref_peel', 'swh_name': ref_name, 'swh_target': str(ref.target), 'swh_id': log_id, }) target_obj = ref.peel() else: target_obj = repo[target] if target_obj.type == GIT_OBJ_TAG: self.log.debug("Peeling ref %s pointing at tag %s" % ( ref_name, target_obj.name), extra={ 'swh_type': 'git_ref_peel', 'swh_name': ref_name, 'swh_target': str(target_obj.name), 'swh_id': log_id, }) target_obj = ref.peel() if not target_obj.type == GIT_OBJ_COMMIT: self.log.info("Skipping ref %s pointing to %s %s" % ( ref_name, target_obj.__class__.__name__, target_obj.id.hex), extra={ 'swh_type': 'git_ref_skip', 'swh_name': ref_name, 'swh_target': str(target_obj), 'swh_id': log_id, }) refs.append({ 'branch': ref_name, 'revision': target_obj.id.raw, 'origin': origin_id, 'validity': validity, 'authority': authority_id, }) return refs def list_repo_objs(self, repo): """List all the objects from repo. Args: - repo (pygit2.Repository): the repository to list Returns: a dict containing lists of `Oid`s with keys for each object type: - GIT_OBJ_BLOB - GIT_OBJ_TREE - GIT_OBJ_COMMIT - GIT_OBJ_TAG """ log_id = str(uuid.uuid4()) self.log.info("Started listing %s" % repo.path, extra={ 'swh_type': 'git_list_objs_start', 'swh_repo': repo.path, 'swh_id': log_id, }) objects = get_objects_per_object_type(repo) self.log.info("Done listing the objects in %s: %d contents, " "%d directories, %d revisions, %d releases" % ( repo.path, len(objects[GIT_OBJ_BLOB]), len(objects[GIT_OBJ_TREE]), len(objects[GIT_OBJ_COMMIT]), len(objects[GIT_OBJ_TAG]), ), extra={ 'swh_type': 'git_list_objs_end', 'swh_repo': repo.path, 'swh_num_blobs': len(objects[GIT_OBJ_BLOB]), 'swh_num_trees': len(objects[GIT_OBJ_TREE]), 'swh_num_commits': len(objects[GIT_OBJ_COMMIT]), 'swh_num_tags': len(objects[GIT_OBJ_TAG]), 'swh_id': log_id, }) return objects def open_repo(self, repo_path): return pygit2.Repository(repo_path) def open_fetch_history(self, origin_id): return self.storage.fetch_history_start(origin_id) def close_fetch_history_success(self, fetch_history_id, objects, refs): data = { 'status': True, 'result': { 'contents': len(objects.get(GIT_OBJ_BLOB, [])), 'directories': len(objects.get(GIT_OBJ_TREE, [])), 'revisions': len(objects.get(GIT_OBJ_COMMIT, [])), 'releases': len(objects.get(GIT_OBJ_TAG, [])), 'occurrences': len(refs), }, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } return self.storage.fetch_history_end(fetch_history_id, data) def load_repo(self, repo, objects, refs, origin_id): if self.config['send_contents']: self.bulk_send_blobs(repo, objects[GIT_OBJ_BLOB], origin_id) else: self.log.info('Not sending contents') if self.config['send_directories']: self.bulk_send_trees(repo, objects[GIT_OBJ_TREE]) else: self.log.info('Not sending directories') if self.config['send_revisions']: self.bulk_send_commits(repo, objects[GIT_OBJ_COMMIT]) else: self.log.info('Not sending revisions') if self.config['send_releases']: self.bulk_send_annotated_tags(repo, objects[GIT_OBJ_TAG]) else: self.log.info('Not sending releases') if self.config['send_occurrences']: self.bulk_send_refs(repo, refs) else: self.log.info('Not sending occurrences') def process(self, repo_path, origin_url, authority_id, validity): # Open repository repo = self.open_repo(repo_path) # Add origin to storage if needed, use the one from config if not origin = self.repo_origin(repo, origin_url) # Create fetch_history fetch_history = self.open_fetch_history(origin['id']) closed = False try: # Parse all the refs from our repo refs = self.list_repo_refs(repo, origin['id'], authority_id, validity) if not refs: self.log.info('Skipping empty repository %s' % repo_path, extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': repo_path, 'swh_num_refs': 0, }) # End fetch_history self.close_fetch_history_success(fetch_history, {}, refs) closed = True return else: self.log.info('Listed %d refs for repo %s' % ( len(refs), repo_path), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': repo_path, 'swh_num_refs': len(refs), }) # We want to load the repository, walk all the objects objects = self.list_repo_objs(repo) # Finally, load the repository self.load_repo(repo, objects, refs, origin['id']) # End fetch_history self.close_fetch_history_success(fetch_history, objects, refs) closed = True finally: if not closed: self.close_fetch_history_failure(fetch_history) diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 8325333..d9bf18d 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,97 +1,119 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import os -from swh.core.scheduling import Task +from swh.scheduler.task import Task from .loader import BulkLoader +from .updater import BulkUpdater class LoadGitRepository(Task): """Import a git repository to Software Heritage""" task_queue = 'swh_loader_git' CONFIG_BASE_FILENAME = 'loader/git.ini' ADDITIONAL_CONFIG = {} def __init__(self): self.config = BulkLoader.parse_config_file( base_filename=self.CONFIG_BASE_FILENAME, additional_configs=[self.ADDITIONAL_CONFIG], ) def run(self, repo_path, origin_url, authority_id, validity): """Import a git repository""" loader = BulkLoader(self.config) loader.log = self.log loader.process(repo_path, origin_url, authority_id, validity) +class UpdateGitRepository(Task): + """Import a git repository as an update""" + task_queue = 'swh_loader_git' + + CONFIG_BASE_FILENAME = 'loader/git-updater.ini' + ADDITIONAL_CONFIG = {} + + def __init__(self): + self.config = BulkUpdater.parse_config_file( + base_filename=self.CONFIG_BASE_FILENAME, + additional_configs=[self.ADDITIONAL_CONFIG], + ) + + def run(self, repo_url, base_url=None): + """Import a git repository""" + loader = BulkUpdater(self.config) + loader.log = self.log + + return loader.process(repo_url, base_url) + + class LoadGitHubRepository(LoadGitRepository): """Import a github repository to Software Heritage""" task_queue = 'swh_loader_git' CONFIG_BASE_FILENAME = 'loader/github.ini' ADDITIONAL_CONFIG = { 'github_basepath': ('str', '/srv/storage/space/data/github'), 'authority_id': ('str', '5f4d4c51-498a-4e28-88b3-b3e4e8396cba'), 'default_validity': ('str', '1970-01-01 00:00:00+00'), } def run(self, repo_fullname): authority_id = self.config['authority_id'] validity = self.config['default_validity'] repo_path = os.path.join(self.config['github_basepath'], repo_fullname[0], repo_fullname) witness_file = os.path.join(repo_path, 'witness') if os.path.exists(witness_file): validity_timestamp = os.stat(witness_file).st_mtime validity = '%s+00' % datetime.datetime.utcfromtimestamp( validity_timestamp) origin_url = 'https://github.com/%s' % repo_fullname super().run(repo_path, origin_url, authority_id, validity) class LoadGitHubRepositoryReleases(LoadGitHubRepository): """Import a GitHub repository to SoftwareHeritage, only with releases""" task_queue = 'swh_loader_git_express' def __init__(self): super(self.__class__, self).__init__() self.config.update({ 'send_contents': False, 'send_directories': False, 'send_revisions': False, 'send_releases': True, 'send_occurrences': False, }) class LoadGitHubRepositoryContents(LoadGitHubRepository): """Import a GitHub repository to SoftwareHeritage, only with contents""" task_queue = 'swh_loader_git_express' def __init__(self): super(self.__class__, self).__init__() self.config.update({ 'send_contents': True, 'send_directories': False, 'send_revisions': False, 'send_releases': False, 'send_occurrences': False, }) diff --git a/swh/loader/git/tests/test_converters.py b/swh/loader/git/tests/test_converters.py index 852f959..ffb0cc1 100644 --- a/swh/loader/git/tests/test_converters.py +++ b/swh/loader/git/tests/test_converters.py @@ -1,132 +1,159 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile import unittest import datetime from nose.tools import istest import pygit2 import swh.loader.git.converters as converters from swh.core.hashutil import hex_to_hash class TestConverters(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.repo_path = tempfile.mkdtemp() cls.repo = pygit2.init_repository(cls.repo_path, bare=True) fast_export = os.path.join(os.path.dirname(__file__), '../../../../..', 'swh-storage-testdata', 'git-repos', 'example-submodule.fast-export.xz') xz = subprocess.Popen( ['xzcat'], stdin=open(fast_export, 'rb'), stdout=subprocess.PIPE, ) git = subprocess.Popen( ['git', 'fast-import', '--quiet'], stdin=xz.stdout, cwd=cls.repo_path, ) # flush stdout of xz xz.stdout.close() git.communicate() @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.repo_path) print(cls.repo_path) def setUp(self): super().setUp() self.blob_id = pygit2.Oid( hex='28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0') self.blob = { 'sha1_git': self.blob_id.raw, 'sha1': hex_to_hash('4850a3420a2262ff061cb296fb915430fa92301c'), 'sha256': hex_to_hash('fee7c8a485a10321ad94b64135073cb5' '5f22cb9f57fa2417d2adfb09d310adef'), 'data': (b'[submodule "example-dependency"]\n' b'\tpath = example-dependency\n' b'\turl = https://github.com/githubtraining/' b'example-dependency.git\n'), 'length': 124, 'status': 'visible', } self.blob_hidden = { 'sha1_git': self.blob_id.raw, 'length': 124, 'status': 'absent', 'reason': 'Content too large', 'origin': None, } @istest def blob_to_content(self): content = converters.blob_to_content(self.blob_id, self.repo) self.assertEqual(self.blob, content) @istest def blob_to_content_absent(self): max_length = self.blob['length'] - 1 content = converters.blob_to_content(self.blob_id, self.repo, max_content_size=max_length) self.assertEqual(self.blob_hidden, content) @istest def commit_to_revision(self): sha1 = '9768d0b576dbaaecd80abedad6dfd0d72f1476da' commit = self.repo.revparse_single(sha1) # when actual_revision = converters.commit_to_revision(commit.id, self.repo) offset = datetime.timedelta(minutes=120) tzoffset = datetime.timezone(offset) expected_revision = { 'id': hex_to_hash('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), 'directory': b'\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca', 'type': 'git', 'committer': { 'name': b'Stefano Zacchiroli', 'email': b'zack@upsilon.cc', }, 'author': { 'name': b'Stefano Zacchiroli', 'email': b'zack@upsilon.cc', }, 'committer_date': datetime.datetime(2015, 9, 24, 10, 36, 5, tzinfo=tzoffset), 'message': b'add submodule dependency\n', 'metadata': None, 'date': datetime.datetime(2015, 9, 24, 10, 36, 5, tzinfo=tzoffset), 'parents': [ b'\xc3\xc5\x88q23`\x9f[\xbb\xb2\xd9\xe7\xf3\xfbJf\x0f?r' ], 'synthetic': False, } # then self.assertEquals(actual_revision, expected_revision) self.assertEquals(offset, expected_revision['date'].utcoffset()) self.assertEquals(offset, expected_revision['committer_date'].utcoffset()) + + @istest + def ref_to_occurrence_1(self): + # when + actual_occ = converters.ref_to_occurrence({ + 'id': 'some-id', + 'branch': 'some/branch' + }) + # then + self.assertEquals(actual_occ, { + 'id': 'some-id', + 'branch': b'some/branch' + }) + + @istest + def ref_to_occurrence_2(self): + # when + actual_occ = converters.ref_to_occurrence({ + 'id': 'some-id', + 'branch': b'some/branch' + }) + + # then + self.assertEquals(actual_occ, { + 'id': 'some-id', + 'branch': b'some/branch' + }) diff --git a/swh/loader/git/loader.py b/swh/loader/git/updater.py similarity index 50% copy from swh/loader/git/loader.py copy to swh/loader/git/updater.py index c9fb7ab..2da0a32 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/updater.py @@ -1,502 +1,665 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime +from io import BytesIO import logging +import sys import traceback import uuid +from collections import defaultdict +import dulwich.client +from dulwich.object_store import ObjectStoreGraphWalker +from dulwich.pack import PackData, PackInflater import psycopg2 -import pygit2 -from pygit2 import Oid, GIT_OBJ_BLOB, GIT_OBJ_TREE, GIT_OBJ_COMMIT, GIT_OBJ_TAG import requests from retrying import retry +from urllib.parse import urlparse -from swh.core import config +from swh.core import config, hashutil +from swh.storage import get_storage from . import converters -from .utils import get_objects_per_object_type + + +class SWHRepoRepresentation: + """Repository representation for a Software Heritage origin.""" + def __init__(self, storage, origin_id): + self.storage = storage + + self._parents_cache = {} + self._type_cache = {} + + if origin_id: + self.heads = self._cache_heads(origin_id) + else: + self.heads = [] + + def _fill_parents_cache(self, commit): + """When querying for a commit's parents, we fill the cache to a depth of 100 + commits.""" + root_rev = hashutil.bytehex_to_hash(commit) + for rev, parents in self.storage.revision_shortlog([root_rev], 100): + rev_id = hashutil.hash_to_bytehex(rev) + if rev_id not in self._parents_cache: + self._parents_cache[rev_id] = [ + hashutil.hash_to_bytehex(parent) for parent in parents + ] + + def _cache_heads(self, origin_id): + """Return all the known head commits for `origin_id`""" + return [ + hashutil.hash_to_bytehex(revision['id']) + for revision in self.storage.revision_get_by( + origin_id, branch_name=None, timestamp=None, limit=None) + ] + + def get_parents(self, commit): + """get the parent commits for `commit`""" + if commit not in self._parents_cache: + self._fill_parents_cache(commit) + return self._parents_cache.get(commit, []) + + def get_heads(self): + return self.heads + + @staticmethod + def _encode_for_storage(objects): + return [hashutil.bytehex_to_hash(object) for object in objects] + + @staticmethod + def _decode_from_storage(objects): + return set(hashutil.hash_to_bytehex(object) for object in objects) + + def get_stored_commits(self, commits): + return commits - self._decode_from_storage( + self.storage.revision_missing( + self._encode_for_storage(commits) + ) + ) + + def get_stored_tags(self, tags): + return tags - self._decode_from_storage( + self.storage.release_missing( + self._encode_for_storage(tags) + ) + ) + + def get_stored_trees(self, trees): + return trees - self._decode_from_storage( + self.storage.directory_missing( + self._encode_for_storage(trees) + ) + ) + + def get_stored_blobs(self, blobs): + ret = set() + for blob in blobs: + if self.storage.content_find({ + 'sha1_git': hashutil.bytehex_to_hash(blob), + }): + ret.add(blob) + + return ret + + def graph_walker(self): + return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) + + @staticmethod + def filter_unwanted_refs(refs): + """Filter the unwanted references from refs""" + ret = {} + for ref, val in refs.items(): + if ref.endswith(b'^{}'): + # Peeled refs make the git protocol explode + continue + elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): + # We filter-out auto-merged GitHub pull requests + continue + else: + ret[ref] = val + + return ret + + def determine_wants(self, refs): + if not refs: + return [] + refs = self.parse_local_refs(refs) + ret = set() + for ref, target in self.filter_unwanted_refs(refs).items(): + if target['target_type'] is None: + # The target doesn't exist in Software Heritage + ret.add(target['target']) + + return list(ret) + + def parse_local_refs(self, remote_refs): + """Parse the remote refs information and list the objects that exist in + Software Heritage""" + + all_objs = set(remote_refs.values()) - set(self._type_cache) + type_by_id = {} + + tags = self.get_stored_tags(all_objs) + all_objs -= tags + for tag in tags: + type_by_id[tag] = 'release' + + commits = self.get_stored_commits(all_objs) + all_objs -= commits + for commit in commits: + type_by_id[commit] = 'revision' + + trees = self.get_stored_trees(all_objs) + all_objs -= trees + for tree in trees: + type_by_id[tree] = 'directory' + + blobs = self.get_stored_blobs(all_objs) + all_objs -= blobs + for blob in blobs: + type_by_id[blob] = 'content' + + self._type_cache.update(type_by_id) + + ret = {} + for ref, id in remote_refs.items(): + ret[ref] = { + 'target': id, + 'target_type': self._type_cache.get(id), + } + return ret def send_in_packets(source_list, formatter, sender, packet_size, packet_size_bytes=None, *args, **kwargs): """Send objects from `source_list`, passed through `formatter` (with extra args *args, **kwargs), using the `sender`, in packets of `packet_size` objects (and of max `packet_size_bytes`). """ formatted_objects = [] count = 0 if not packet_size_bytes: packet_size_bytes = 0 for obj in source_list: formatted_object = formatter(obj, *args, **kwargs) if formatted_object: formatted_objects.append(formatted_object) else: continue if packet_size_bytes: count += formatted_object['length'] if len(formatted_objects) >= packet_size or count > packet_size_bytes: sender(formatted_objects) formatted_objects = [] count = 0 if formatted_objects: sender(formatted_objects) def retry_loading(error): """Retry policy when we catch a recoverable error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader.git.BulkLoader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True -class BulkLoader(config.SWHConfig): +class BulkUpdater(config.SWHConfig): """A bulk loader for a git repository""" + CONFIG_BASE_FILENAME = 'loader/git-updater.ini' DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5000/']), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } def __init__(self, config): self.config = config - - if self.config['storage_class'] == 'remote_storage': - from swh.storage.api.client import RemoteStorage as Storage - else: - from swh.storage import Storage - - self.storage = Storage(*self.config['storage_args']) - + self.storage = get_storage(config['storage_class'], + config['storage_args']) self.log = logging.getLogger('swh.loader.git.BulkLoader') @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database""" num_contents = len(content_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database""" num_directories = len(directory_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database""" num_revisions = len(revision_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database""" num_releases = len(release_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database""" num_occurrences = len(occurrence_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) + def fetch_pack_from_origin(self, origin_url, base_origin_id, do_activity): + """Fetch a pack from the origin""" + pack_buffer = BytesIO() + + base_repo = SWHRepoRepresentation(self.storage, base_origin_id) + + parsed_uri = urlparse(origin_url) + + path = parsed_uri.path + if not path.endswith('.git'): + path += '.git' + + client = dulwich.client.TCPGitClient(parsed_uri.netloc, + thin_packs=False) + + def do_pack(data, pack_buffer=pack_buffer): + pack_buffer.write(data) + + remote_refs = client.fetch_pack(path.encode('ascii'), + base_repo.determine_wants, + base_repo.graph_walker(), + do_pack, + progress=do_activity) + + if remote_refs: + local_refs = base_repo.parse_local_refs(remote_refs) + else: + local_refs = remote_refs = {} + + pack_buffer.flush() + pack_size = pack_buffer.tell() + pack_buffer.seek(0) + return { + 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), + 'local_refs': local_refs, + 'pack_buffer': pack_buffer, + 'pack_size': pack_size, + } + + def get_origin(self, origin_url): + origin = converters.origin_url_to_origin(origin_url) + + return self.storage.origin_get(origin) + def get_or_create_origin(self, origin_url): origin = converters.origin_url_to_origin(origin_url) origin['id'] = self.storage.origin_add_one(origin) return origin - def repo_origin(self, repo, origin_url): + def create_origin(self, origin_url): log_id = str(uuid.uuid4()) self.log.debug('Creating origin for %s' % origin_url, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin = self.get_or_create_origin(origin_url) self.log.debug('Done creating origin for %s' % origin_url, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin - def bulk_send_blobs(self, repo, blobs, origin_id): + def bulk_send_blobs(self, inflater, origin_id): """Format blobs as swh contents and send them to the database""" packet_size = self.config['content_packet_size'] packet_size_bytes = self.config['content_packet_size_bytes'] max_content_size = self.config['content_size_limit'] - send_in_packets(blobs, converters.blob_to_content, - self.send_contents, packet_size, repo=repo, + send_in_packets(inflater, converters.dulwich_blob_to_content, + self.send_contents, packet_size, packet_size_bytes=packet_size_bytes, log=self.log, max_content_size=max_content_size, origin_id=origin_id) - def bulk_send_trees(self, repo, trees): + def bulk_send_trees(self, inflater): """Format trees as swh directories and send them to the database""" packet_size = self.config['directory_packet_size'] - send_in_packets(trees, converters.tree_to_directory, - self.send_directories, packet_size, repo=repo, + send_in_packets(inflater, converters.dulwich_tree_to_directory, + self.send_directories, packet_size, log=self.log) - def bulk_send_commits(self, repo, commits): + def bulk_send_commits(self, inflater): """Format commits as swh revisions and send them to the database""" packet_size = self.config['revision_packet_size'] - send_in_packets(commits, converters.commit_to_revision, - self.send_revisions, packet_size, repo=repo, + send_in_packets(inflater, converters.dulwich_commit_to_revision, + self.send_revisions, packet_size, log=self.log) - def bulk_send_annotated_tags(self, repo, tags): - """Format annotated tags (pygit2.Tag objects) as swh releases and send + def bulk_send_tags(self, inflater): + """Format annotated tags (dulwich.objects.Tag objects) as swh releases and send them to the database """ packet_size = self.config['release_packet_size'] - send_in_packets(tags, converters.annotated_tag_to_release, - self.send_releases, packet_size, repo=repo, + send_in_packets(inflater, converters.dulwich_tag_to_release, + self.send_releases, packet_size, log=self.log) - def bulk_send_refs(self, repo, refs): + def bulk_send_refs(self, refs): """Format git references as swh occurrences and send them to the database """ packet_size = self.config['occurrence_packet_size'] - send_in_packets(refs, converters.ref_to_occurrence, - self.send_occurrences, packet_size) - - def list_repo_refs(self, repo, origin_id, authority_id, validity): - """List all the refs from the given repository. - - Args: - - repo (pygit2.Repository): the repository to list - - origin_id (int): the id of the origin from which the repo is - taken - - validity (datetime.datetime): the validity date for the - repository's refs - - authority_id (str): the uuid of the authority on `validity`. - - Returns: - A list of dicts with keys: - - branch (str): name of the ref - - revision (sha1_git): revision pointed at by the ref - - origin (int) - - validity (datetime.DateTime) - - authority (str) - Compatible with occurrence_add. - """ - - log_id = str(uuid.uuid4()) - - refs = [] - ref_names = repo.listall_references() - for ref_name in ref_names: - ref = repo.lookup_reference(ref_name) - target = ref.target - - if not isinstance(target, Oid): - self.log.debug("Peeling symbolic ref %s pointing at %s" % ( - ref_name, ref.target), extra={ - 'swh_type': 'git_sym_ref_peel', - 'swh_name': ref_name, - 'swh_target': str(ref.target), - 'swh_id': log_id, - }) - target_obj = ref.peel() - else: - target_obj = repo[target] - - if target_obj.type == GIT_OBJ_TAG: - self.log.debug("Peeling ref %s pointing at tag %s" % ( - ref_name, target_obj.name), extra={ - 'swh_type': 'git_ref_peel', - 'swh_name': ref_name, - 'swh_target': str(target_obj.name), - 'swh_id': log_id, - }) - target_obj = ref.peel() - - if not target_obj.type == GIT_OBJ_COMMIT: - self.log.info("Skipping ref %s pointing to %s %s" % ( - ref_name, target_obj.__class__.__name__, - target_obj.id.hex), extra={ - 'swh_type': 'git_ref_skip', - 'swh_name': ref_name, - 'swh_target': str(target_obj), - 'swh_id': log_id, - }) - - refs.append({ - 'branch': ref_name, - 'revision': target_obj.id.raw, - 'origin': origin_id, - 'validity': validity, - 'authority': authority_id, - }) - - return refs - - def list_repo_objs(self, repo): - """List all the objects from repo. - - Args: - - repo (pygit2.Repository): the repository to list - - Returns: - a dict containing lists of `Oid`s with keys for each object type: - - GIT_OBJ_BLOB - - GIT_OBJ_TREE - - GIT_OBJ_COMMIT - - GIT_OBJ_TAG - """ - log_id = str(uuid.uuid4()) - - self.log.info("Started listing %s" % repo.path, extra={ - 'swh_type': 'git_list_objs_start', - 'swh_repo': repo.path, - 'swh_id': log_id, - }) - objects = get_objects_per_object_type(repo) - self.log.info("Done listing the objects in %s: %d contents, " - "%d directories, %d revisions, %d releases" % ( - repo.path, - len(objects[GIT_OBJ_BLOB]), - len(objects[GIT_OBJ_TREE]), - len(objects[GIT_OBJ_COMMIT]), - len(objects[GIT_OBJ_TAG]), - ), extra={ - 'swh_type': 'git_list_objs_end', - 'swh_repo': repo.path, - 'swh_num_blobs': len(objects[GIT_OBJ_BLOB]), - 'swh_num_trees': len(objects[GIT_OBJ_TREE]), - 'swh_num_commits': len(objects[GIT_OBJ_COMMIT]), - 'swh_num_tags': len(objects[GIT_OBJ_TAG]), - 'swh_id': log_id, - }) - - return objects - - def open_repo(self, repo_path): - return pygit2.Repository(repo_path) + send_in_packets(refs, lambda x: x, self.send_occurrences, packet_size) def open_fetch_history(self, origin_id): return self.storage.fetch_history_start(origin_id) def close_fetch_history_success(self, fetch_history_id, objects, refs): data = { 'status': True, 'result': { - 'contents': len(objects.get(GIT_OBJ_BLOB, [])), - 'directories': len(objects.get(GIT_OBJ_TREE, [])), - 'revisions': len(objects.get(GIT_OBJ_COMMIT, [])), - 'releases': len(objects.get(GIT_OBJ_TAG, [])), + 'contents': len(objects[b'blob']), + 'directories': len(objects[b'tree']), + 'revisions': len(objects[b'commit']), + 'releases': len(objects[b'tag']), 'occurrences': len(refs), }, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } return self.storage.fetch_history_end(fetch_history_id, data) - def load_repo(self, repo, objects, refs, origin_id): + def get_inflater(self, pack_buffer, pack_size): + """Reset the pack buffer and get an object inflater from it""" + pack_buffer.seek(0) + return PackInflater.for_pack_data( + PackData.from_file(pack_buffer, pack_size)) + + def list_pack(self, pack_data, pack_size): + id_to_type = {} + type_to_ids = defaultdict(set) + inflater = self.get_inflater(pack_data, pack_size) + + for obj in inflater: + type, id = obj.type_name, obj.id + id_to_type[id] = type + type_to_ids[type].add(id) + + return id_to_type, type_to_ids + + def list_refs(self, remote_refs, local_refs, id_to_type, origin_id, date): + ret = [] + for ref in remote_refs: + ret_ref = local_refs[ref].copy() + ret_ref.update({ + 'branch': ref, + 'origin': origin_id, + 'date': date, + }) + if not ret_ref['target_type']: + target_type = id_to_type[ret_ref['target']] + ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] + + ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) + + ret.append(ret_ref) + + return ret + + def load_pack(self, pack_buffer, pack_size, refs, origin_id): if self.config['send_contents']: - self.bulk_send_blobs(repo, objects[GIT_OBJ_BLOB], origin_id) + self.bulk_send_blobs(self.get_inflater(pack_buffer, pack_size), + origin_id) else: self.log.info('Not sending contents') if self.config['send_directories']: - self.bulk_send_trees(repo, objects[GIT_OBJ_TREE]) + self.bulk_send_trees(self.get_inflater(pack_buffer, pack_size)) else: self.log.info('Not sending directories') if self.config['send_revisions']: - self.bulk_send_commits(repo, objects[GIT_OBJ_COMMIT]) + self.bulk_send_commits(self.get_inflater(pack_buffer, pack_size)) else: self.log.info('Not sending revisions') if self.config['send_releases']: - self.bulk_send_annotated_tags(repo, objects[GIT_OBJ_TAG]) + self.bulk_send_tags(self.get_inflater(pack_buffer, pack_size)) else: self.log.info('Not sending releases') if self.config['send_occurrences']: - self.bulk_send_refs(repo, refs) + self.bulk_send_refs(refs) else: self.log.info('Not sending occurrences') - def process(self, repo_path, origin_url, authority_id, validity): - # Open repository - repo = self.open_repo(repo_path) + def process(self, origin_url, base_url): + eventful = False + + date = datetime.datetime.now(tz=datetime.timezone.utc) # Add origin to storage if needed, use the one from config if not - origin = self.repo_origin(repo, origin_url) + origin = self.create_origin(origin_url) + base_origin = origin + if base_url: + base_origin = self.get_origin(base_url) # Create fetch_history fetch_history = self.open_fetch_history(origin['id']) closed = False + def do_progress(msg): + sys.stderr.buffer.write(msg) + sys.stderr.flush() + try: - # Parse all the refs from our repo - refs = self.list_repo_refs(repo, origin['id'], authority_id, - validity) + original_heads = list(self.storage.occurrence_get(origin['id'])) + original_heads.sort(key=lambda h: h['branch']) + + fetch_info = self.fetch_pack_from_origin( + origin_url, base_origin['id'], do_progress) + + pack_buffer = fetch_info['pack_buffer'] + pack_size = fetch_info['pack_size'] - if not refs: - self.log.info('Skipping empty repository %s' % repo_path, + remote_refs = fetch_info['remote_refs'] + local_refs = fetch_info['local_refs'] + if not remote_refs: + self.log.info('Skipping empty repository %s' % origin_url, extra={ 'swh_type': 'git_repo_list_refs', - 'swh_repo': repo_path, + 'swh_repo': origin_url, 'swh_num_refs': 0, }) # End fetch_history - self.close_fetch_history_success(fetch_history, {}, refs) + self.close_fetch_history_success(fetch_history, + defaultdict(set), []) closed = True return else: self.log.info('Listed %d refs for repo %s' % ( - len(refs), repo_path), extra={ + len(remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', - 'swh_repo': repo_path, - 'swh_num_refs': len(refs), + 'swh_repo': origin_url, + 'swh_num_refs': len(remote_refs), }) # We want to load the repository, walk all the objects - objects = self.list_repo_objs(repo) + id_to_type, type_to_ids = self.list_pack(pack_buffer, pack_size) + + # Parse the remote references and add info from the local ones + refs = self.list_refs(remote_refs, local_refs, + id_to_type, origin['id'], date) # Finally, load the repository - self.load_repo(repo, objects, refs, origin['id']) + self.load_pack(pack_buffer, pack_size, refs, origin['id']) + + end_heads = list(self.storage.occurrence_get(origin['id'])) + end_heads.sort(key=lambda h: h['branch']) + + eventful = original_heads != end_heads # End fetch_history - self.close_fetch_history_success(fetch_history, objects, refs) + self.close_fetch_history_success(fetch_history, type_to_ids, refs) closed = True finally: if not closed: self.close_fetch_history_failure(fetch_history) + + return eventful + + +if __name__ == '__main__': + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(process)d %(message)s' + ) + config = BulkUpdater.parse_config_file( + base_filename='loader/git-updater.ini' + ) + + bulkupdater = BulkUpdater(config) + + origin_url = sys.argv[1] + base_url = origin_url + if len(sys.argv) > 2: + base_url = sys.argv[2] + + print(bulkupdater.process(origin_url, base_url)) diff --git a/version.txt b/version.txt index b3a0857..7c17dc6 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.10-0-g3d36175 \ No newline at end of file +v0.0.11-0-gafa4284 \ No newline at end of file