diff --git a/PKG-INFO b/PKG-INFO index be92d23..c870732 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.13 +Version: 0.0.14 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/README b/README index 617f8ba..f246fb7 100644 --- a/README +++ b/README @@ -1,106 +1,82 @@ The Software Heritage Git Loader is a tool and a library to walk a local Git repository and inject into the SWH dataset all contained files that weren't known before. License ======= This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. See top-level LICENSE file for the full text of the GNU General Public License along with this program. Dependencies ============ Runtime ------- - python3 -- python3-pygit2 +- python3-dulwich +- python3-retrying - python3-swh.core +- python3-swh.model - python3-swh.storage Test ---- - python3-nose Requirements ============ - implementation language, Python3 - coding guidelines: conform to PEP8 -- Git access: via libgit2/pygit +- Git access: via dulwich Configuration ============= -bin/swh-loader-git takes one argument: a configuration file in .ini format. +You can run the loader or the updater directly by calling python3 -m swh.loader.git.{loader,updater}. + +Both tools expect a configuration file in .ini format to be present in ~/.config/swh/loader/git-{loader,updater}.ini The configuration file contains the following directives: ``` [main] # the storage class used. one of remote_storage, local_storage storage_class = remote_storage # arguments passed to the storage class # for remote_storage: URI of the storage server storage_args = http://localhost:5000/ # for local_storage: database connection string and root of the # storage, comma separated # storage_args = dbname=softwareheritage-dev, /tmp/swh/storage -# The path to the repository to load -repo_path = /tmp/git_repo - -# The URL of the origin for the repo -origin_url = https://github.com/hylang/hy - -# The UUID of the authority that dated the validity of the repo -authority = 5f4d4c51-498a-4e28-88b3-b3e4e8396cba - -# The validity date of the refs in the given repo, in Postgres -# timestamptz format -validity = 2015-01-01 00:00:00+00 - # Whether to send the given types of objects send_contents = True send_directories = True send_revisions = True send_releases = True send_occurrences = True # The size of the packets sent to storage for each kind of object content_packet_size = 100000 content_packet_size_bytes = 1073741824 directory_packet_size = 25000 revision_packet_size = 100000 release_packet_size = 100000 occurrence_packet_size = 100000 ``` - -bin/swh-loader-git-multi takes the same arguments, and adds: - -``` -[main] -# database connection string to the lister-github database -lister_db = dbname=lister-github - -# base path of the github repositories -repo_basepath = /srv/storage/space/data/github - -# Whether to run the mass loading or just list the repos -dry_run = False - -``` diff --git a/bin/swh-loader-git b/bin/swh-loader-git deleted file mode 100755 index 9ed1d92..0000000 --- a/bin/swh-loader-git +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import sys - -from swh.loader.git import BulkLoader - -ADDITIONAL_CONFIG = { - 'repo_path': ('str', None), - - 'origin_url': ('str', 'file:///dev/null'), - 'authority': ('str', '5f4d4c51-498a-4e28-88b3-b3e4e8396cba'), - 'validity': ('str', '2015-01-01 00:00:00+00'), -} - -my_config = BulkLoader.parse_config_file( - config_filename=sys.argv[1], additional_configs=[ADDITIONAL_CONFIG]) - -logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(name)s %(levelname)s %(message)s', - handlers=[ - logging.StreamHandler(), - ], -) - -requests_log = logging.getLogger("requests") -requests_log.setLevel(logging.CRITICAL) - -loader = BulkLoader(my_config) -loader.process(my_config['repo_path'], - my_config['origin_url'], - my_config['authority'], - my_config['validity']) diff --git a/bin/swh-loader-git-multi b/bin/swh-loader-git-multi deleted file mode 100755 index 715828b..0000000 --- a/bin/swh-loader-git-multi +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/env python3 - -import datetime -import logging -import os -import sys - -import psycopg2 -import psycopg2.extras - -from swh.core import config -from swh.loader.git import BulkLoader - -DEFAULT_CONFIG = { - 'lister_db': ('str', 'dbname=lister-github'), - 'repo_basepath': ('str', '/srv/storage/space/data/github'), - 'dry_run': ('bool', True), - - 'db': ('str', 'dbname=softwareheritage-dev'), - 'storage_base': ('str', '/tmp/swh-loader-git/test'), - 'repo_path': ('str', None), - - 'origin_url': ('str', 'file:///dev/null'), - 'authority': ('str', '5f4d4c51-498a-4e28-88b3-b3e4e8396cba'), - 'validity': ('str', '2015-01-01 00:00:00+00'), - - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), - - 'content_packet_size': ('int', 100000), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), -} - -logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(name)s %(levelname)s %(message)s') - -logger = logging.getLogger('test-bulk-loader-all') - -base_config = config.read(sys.argv[1], DEFAULT_CONFIG) - - -def process_one_repository(base_config, repo_name): - my_config = base_config.copy() - - basepath = my_config['repo_basepath'] - - my_path = os.path.join(basepath, repo_name[0], repo_name) - my_config['repo_path'] = my_path - - if not os.path.exists(my_path): - logger.error('Repository %s does not exist at %s' % (repo_name, - my_path)) - return - - witness_file = os.path.join(my_path, 'witness') - if not os.path.exists(witness_file): - logger.warn('No witness file for repository %s, using default value ' - '%s' % (repo_name, my_config['validity'])) - else: - validity_timestamp = os.stat(witness_file).st_mtime - my_config['validity'] = "%s+00" % datetime.datetime.utcfromtimestamp( - validity_timestamp) - - logger.info('Processing repository %s fetched on %s' % ( - repo_name, - my_config['validity'])) - - if my_config['dry_run']: - return - - loader = BulkLoader(my_config) - origin = loader.get_origin() - if origin['id']: - logger.info('Repository %s already loaded (origin id=%s), skipping' % ( - repo_name, origin['id'])) - return - loader.process() - - -def list_random_repos(config): - db = psycopg2.connect(config['lister_db'], - cursor_factory=psycopg2.extras.NamedTupleCursor) - query = '''select full_name from repos_random_sample(0.001) r - inner join crawl_history c on r.id = c.repo where status=true''' - cur = db.cursor() - cur.execute(query) - ret = cur.fetchall() - cur.close() - db.close() - - return ret - -processed_repos = set() - -print('Needs updating for new BulkLoader') -sys.exit(1) - -while True: - logger.info('listing 0.001% random repos') - random_repos = list_random_repos(base_config) - logger.info('done') - for repo in random_repos: - repo_name = repo.full_name - if repo_name not in processed_repos: - try: - process_one_repository(base_config, repo_name) - except Exception: - logger.exception('Failed processing repository %s' % - repo_name) - finally: - processed_repos.add(repo_name) diff --git a/debian/control b/debian/control index c6d5571..2599b49 100644 --- a/debian/control +++ b/debian/control @@ -1,28 +1,27 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, python3-dulwich, - python3-pygit2, python3-retrying, python3-setuptools, python3-swh.core (>= 0.0.7~), python3-swh.model (>= 0.0.3~), python3-swh.scheduler, python3-swh.storage (>= 0.0.37~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), python3-swh.storage (>= 0.0.37~), python3-swh.model (>= 0.0.3~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/doc/attic/api-backend-protocol.txt b/docs/attic/api-backend-protocol.txt similarity index 100% rename from doc/attic/api-backend-protocol.txt rename to docs/attic/api-backend-protocol.txt diff --git a/doc/attic/git-loading-design.txt b/docs/attic/git-loading-design.txt similarity index 100% rename from doc/attic/git-loading-design.txt rename to docs/attic/git-loading-design.txt diff --git a/requirements.txt b/requirements.txt index eac3ae2..0bcbab8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,7 @@ dulwich -pygit2 retrying vcversioner swh.core >= 0.0.7 swh.model >= 0.0.3 swh.scheduler swh.storage >= 0.0.37 diff --git a/scratch/repo_walk.py b/scratch/repo_walk.py deleted file mode 100755 index 58c0c23..0000000 --- a/scratch/repo_walk.py +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env python3 - -import pygit2 -from pygit2 import GIT_SORT_TOPOLOGICAL - -import sys - -repo_path = sys.argv[1] -ref_name = sys.argv[2] - -repo = pygit2.Repository(repo_path) - -ref = repo.lookup_reference(ref_name) - -head_rev = repo[ref.target] - -for rev in repo.walk(head_rev.hex, GIT_SORT_TOPOLOGICAL): - print(rev.hex, rev.tree.hex) - for tree_entry in rev.tree: - print(repo.get(tree_entry.oid)) diff --git a/setup.py b/setup.py index 26b9cef..18b8873 100644 --- a/setup.py +++ b/setup.py @@ -1,30 +1,30 @@ #!/usr/bin/env python3 from setuptools import setup def parse_requirements(): requirements = [] with open('requirements.txt') as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.loader.git', description='Software Heritage git loader', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DCORE/', packages=['swh.loader.git', 'swh.loader.git.tests'], - scripts=['bin/swh-loader-git'], + scripts=[], install_requires=parse_requirements(), setup_requires=['vcversioner'], vcversioner={}, include_package_data=True, ) diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index be92d23..c870732 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.13 +Version: 0.0.14 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/SOURCES.txt b/swh.loader.git.egg-info/SOURCES.txt index 149c9a6..4f2b5af 100644 --- a/swh.loader.git.egg-info/SOURCES.txt +++ b/swh.loader.git.egg-info/SOURCES.txt @@ -1,40 +1,37 @@ .gitignore .gitmodules AUTHORS LICENSE MANIFEST.in Makefile README requirements.txt setup.py version.txt bin/dir-git-repo-meta.sh -bin/swh-loader-git -bin/swh-loader-git-multi debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format -doc/attic/api-backend-protocol.txt -doc/attic/git-loading-design.txt +docs/attic/api-backend-protocol.txt +docs/attic/git-loading-design.txt resources/local-loader-git.ini resources/remote-loader-git.ini resources/updater.ini resources/test/back.ini resources/test/db-manager.ini scratch/analyse-profile.py -scratch/repo_walk.py swh.loader.git.egg-info/PKG-INFO swh.loader.git.egg-info/SOURCES.txt swh.loader.git.egg-info/dependency_links.txt swh.loader.git.egg-info/requires.txt swh.loader.git.egg-info/top_level.txt swh/loader/git/__init__.py +swh/loader/git/base.py swh/loader/git/converters.py swh/loader/git/loader.py swh/loader/git/tasks.py swh/loader/git/updater.py -swh/loader/git/utils.py swh/loader/git/tests/test_converters.py \ No newline at end of file diff --git a/swh.loader.git.egg-info/requires.txt b/swh.loader.git.egg-info/requires.txt index 80874ba..2954b3c 100644 --- a/swh.loader.git.egg-info/requires.txt +++ b/swh.loader.git.egg-info/requires.txt @@ -1,8 +1,7 @@ dulwich -pygit2 retrying swh.core>=0.0.7 swh.model>=0.0.3 swh.scheduler swh.storage>=0.0.37 vcversioner diff --git a/swh/loader/git/__init__.py b/swh/loader/git/__init__.py index 8d731bb..e69de29 100644 --- a/swh/loader/git/__init__.py +++ b/swh/loader/git/__init__.py @@ -1 +0,0 @@ -from .loader import BulkLoader # noqa diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py new file mode 100644 index 0000000..5018e50 --- /dev/null +++ b/swh/loader/git/base.py @@ -0,0 +1,390 @@ +# Copyright (C) 2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import traceback +import uuid + +import psycopg2 +import requests +from retrying import retry + +from swh.core import config +from swh.storage import get_storage + + +def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): + """Send `objects`, using the `sender`, in packets of `packet_size` objects (and + of max `packet_size_bytes`). + """ + formatted_objects = [] + count = 0 + if not packet_size_bytes: + packet_size_bytes = 0 + for obj in objects: + if not obj: + continue + formatted_objects.append(obj) + if packet_size_bytes: + count += obj['length'] + if len(formatted_objects) >= packet_size or count > packet_size_bytes: + sender(formatted_objects) + formatted_objects = [] + count = 0 + + if formatted_objects: + sender(formatted_objects) + + +def retry_loading(error): + """Retry policy when we catch a recoverable error""" + exception_classes = [ + # raised when two parallel insertions insert the same data. + psycopg2.IntegrityError, + # raised when uWSGI restarts and hungs up on the worker. + requests.exceptions.ConnectionError, + ] + + if not any(isinstance(error, exc) for exc in exception_classes): + return False + + logger = logging.getLogger('swh.loader.git.BulkLoader') + + error_name = error.__module__ + '.' + error.__class__.__name__ + logger.warning('Retry loading a batch', exc_info=False, extra={ + 'swh_type': 'storage_retry', + 'swh_exception_type': error_name, + 'swh_exception': traceback.format_exception( + error.__class__, + error, + error.__traceback__, + ), + }) + + return True + + +class BaseLoader(config.SWHConfig): + """This base class is a pattern for loaders. + + The external calling convention is as such: + - instantiate the class once (loads storage and the configuration) + - for each origin, call load with the origin-specific arguments (for + instance, an origin URL). + + load calls several methods that must be implemented in subclasses: + + - prepare(*args, **kwargs) prepares the loader for the new origin + - get_origin gets the origin object associated to the current loader + - fetch_data downloads the necessary data from the origin + - get_{contents,directories,revisions,releases,occurrences} retrieve each + kind of object from the origin + - has_* checks whether there are some objects to load for that object type + - get_fetch_history_result retrieves the data to insert in the + fetch_history table once the load was successful + - eventful returns whether the load was eventful or not + """ + + CONFIG_BASE_FILENAME = None + + DEFAULT_CONFIG = { + 'storage_class': ('str', 'remote_storage'), + 'storage_args': ('list[str]', ['http://localhost:5000/']), + + 'send_contents': ('bool', True), + 'send_directories': ('bool', True), + 'send_revisions': ('bool', True), + 'send_releases': ('bool', True), + 'send_occurrences': ('bool', True), + + 'content_packet_size': ('int', 10000), + 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), + 'directory_packet_size': ('int', 25000), + 'revision_packet_size': ('int', 100000), + 'release_packet_size': ('int', 100000), + 'occurrence_packet_size': ('int', 100000), + } + + ADDITIONAL_CONFIG = {} + + def __init__(self): + self.config = self.parse_config_file( + additional_configs=[self.ADDITIONAL_CONFIG]) + + self.storage = get_storage(self.config['storage_class'], + self.config['storage_args']) + self.log = logging.getLogger('swh.loader.git.BulkLoader') + + def prepare(self, *args, **kwargs): + """Prepare the data source to be loaded""" + raise NotImplementedError + + def get_origin(self): + """Get the origin that is currently being loaded""" + raise NotImplementedError + + def fetch_data(self): + """Fetch the data from the data source""" + raise NotImplementedError + + def has_contents(self): + """Checks whether we need to load contents""" + return True + + def get_contents(self): + """Get the contents that need to be loaded""" + raise NotImplementedError + + def has_directories(self): + """Checks whether we need to load directories""" + return True + + def get_directories(self): + """Get the directories that need to be loaded""" + raise NotImplementedError + + def has_revisions(self): + """Checks whether we need to load revisions""" + return True + + def get_revisions(self): + """Get the revisions that need to be loaded""" + raise NotImplementedError + + def has_releases(self): + """Checks whether we need to load releases""" + return True + + def get_releases(self): + """Get the releases that need to be loaded""" + raise NotImplementedError + + def has_occurrences(self): + """Checks whether we need to load occurrences""" + return True + + def get_occurrences(self): + """Get the occurrences that need to be loaded""" + raise NotImplementedError + + def get_fetch_history_result(self): + """Return the data to store in fetch_history for the current loader""" + raise NotImplementedError + + def eventful(self): + """Whether the load was eventful""" + raise NotImplementedError + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_contents(self, content_list): + """Actually send properly formatted contents to the database""" + num_contents = len(content_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + self.storage.content_add(content_list) + self.log.debug("Done sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_directories(self, directory_list): + """Actually send properly formatted directories to the database""" + num_directories = len(directory_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + self.storage.directory_add(directory_list) + self.log.debug("Done sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_revisions(self, revision_list): + """Actually send properly formatted revisions to the database""" + num_revisions = len(revision_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + self.storage.revision_add(revision_list) + self.log.debug("Done sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_releases(self, release_list): + """Actually send properly formatted releases to the database""" + num_releases = len(release_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + self.storage.release_add(release_list) + self.log.debug("Done sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) + def send_occurrences(self, occurrence_list): + """Actually send properly formatted occurrences to the database""" + num_occurrences = len(occurrence_list) + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + self.storage.occurrence_add(occurrence_list) + self.log.debug("Done sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + + def send_origin(self, origin): + log_id = str(uuid.uuid4()) + self.log.debug('Creating %s origin for %s' % (origin['type'], + origin['url']), + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'origin', + 'swh_num': 1, + 'swh_id': log_id + }) + origin_id = self.storage.origin_add_one(origin) + self.log.debug('Done creating %s origin for %s' % (origin['type'], + origin['url']), + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'origin', + 'swh_num': 1, + 'swh_id': log_id + }) + + return origin_id + + def send_all_contents(self, contents): + """Send all the contents to the database""" + packet_size = self.config['content_packet_size'] + packet_size_bytes = self.config['content_packet_size_bytes'] + + send_in_packets(contents, self.send_contents, packet_size, + packet_size_bytes=packet_size_bytes) + + def send_all_directories(self, directories): + """Send all the directories to the database""" + packet_size = self.config['directory_packet_size'] + + send_in_packets(directories, self.send_directories, packet_size) + + def send_all_revisions(self, revisions): + """Send all the revisions to the database""" + packet_size = self.config['revision_packet_size'] + + send_in_packets(revisions, self.send_revisions, packet_size) + + def send_all_releases(self, releases): + """Send all the releases to the database + """ + packet_size = self.config['release_packet_size'] + + send_in_packets(releases, self.send_releases, packet_size) + + def send_all_occurrences(self, occurrences): + """Send all the occurrences to the database + """ + packet_size = self.config['occurrence_packet_size'] + + send_in_packets(occurrences, self.send_occurrences, packet_size) + + def open_fetch_history(self): + return self.storage.fetch_history_start(self.origin_id) + + def close_fetch_history_success(self, fetch_history_id, result): + data = { + 'status': True, + 'result': result, + } + return self.storage.fetch_history_end(fetch_history_id, data) + + def close_fetch_history_failure(self, fetch_history_id): + import traceback + data = { + 'status': False, + 'stderr': traceback.format_exc(), + } + return self.storage.fetch_history_end(fetch_history_id, data) + + def load(self, *args, **kwargs): + + self.prepare(*args, **kwargs) + origin = self.get_origin() + self.origin_id = self.send_origin(origin) + + fetch_history_id = self.open_fetch_history() + try: + self.fetch_data() + + if self.config['send_contents'] and self.has_contents(): + self.send_all_contents(self.get_contents()) + + if self.config['send_directories'] and self.has_directories(): + self.send_all_directories(self.get_directories()) + + if self.config['send_revisions'] and self.has_revisions(): + self.send_all_revisions(self.get_revisions()) + + if self.config['send_releases'] and self.has_releases(): + self.send_all_releases(self.get_releases()) + + if self.config['send_occurrences'] and self.has_occurrences(): + self.send_all_occurrences(self.get_occurrences()) + + self.close_fetch_history_success(fetch_history_id, + self.get_fetch_history_result()) + except: + self.close_fetch_history_failure(fetch_history_id) + raise + + return self.eventful() diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index 4beaf04..657c525 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,363 +1,213 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -"""Convert pygit2 objects to dictionaries suitable for swh.storage""" - -from pygit2 import GIT_OBJ_COMMIT +"""Convert dulwich objects to dictionaries suitable for swh.storage""" from swh.core import hashutil -from .utils import format_date - -HASH_ALGORITHMS = ['sha1', 'sha256'] - - -def blob_to_content(id, repo, log=None, max_content_size=None, origin_id=None): - """Format a blob as a content""" - blob = repo[id] - size = blob.size - ret = { - 'sha1_git': id.raw, - 'length': blob.size, - 'status': 'absent' - } - - if max_content_size: - if size > max_content_size: - if log: - log.info('Skipping content %s, too large (%s > %s)' % - (id.hex, size, max_content_size), extra={ - 'swh_type': 'loader_git_content_skip', - 'swh_repo': repo.path, - 'swh_id': id.hex, - 'swh_size': size, - }) - ret['reason'] = 'Content too large' - ret['origin'] = origin_id - return ret - - data = blob.data - hashes = hashutil.hashdata(data, HASH_ALGORITHMS) - ret.update(hashes) - ret['data'] = data - ret['status'] = 'visible' - - return ret - - -def tree_to_directory(id, repo, log=None): - """Format a tree as a directory""" - ret = { - 'id': id.raw, - } - entries = [] - ret['entries'] = entries - - entry_type_map = { - 'tree': 'dir', - 'blob': 'file', - 'commit': 'rev', - } - - for entry in repo[id]: - entries.append({ - 'type': entry_type_map[entry.type], - 'perms': entry.filemode, - 'name': entry._name, - 'target': entry.id.raw, - }) - - return ret - -def commit_to_revision(id, repo, log=None): - """Format a commit as a revision""" - commit = repo[id] - - author = commit.author - committer = commit.committer - return { - 'id': id.raw, - 'date': format_date(author), - 'committer_date': format_date(committer), - 'type': 'git', - 'directory': commit.tree_id.raw, - 'message': commit.raw_message, - 'metadata': None, - 'author': { - 'name': author.raw_name, - 'email': author.raw_email, - }, - 'committer': { - 'name': committer.raw_name, - 'email': committer.raw_email, - }, - 'synthetic': False, - 'parents': [p.raw for p in commit.parent_ids], - } - - -def annotated_tag_to_release(id, repo, log=None): - """Format an annotated tag as a release""" - tag = repo[id] - - tag_pointer = repo[tag.target] - if tag_pointer.type != GIT_OBJ_COMMIT: - if log: - log.warn("Ignoring tag %s pointing at %s %s" % ( - tag.id.hex, tag_pointer.__class__.__name__, - tag_pointer.id.hex), extra={ - 'swh_type': 'loader_git_tag_ignore', - 'swh_repo': repo.path, - 'swh_tag_id': tag.id.hex, - 'swh_tag_dest': { - 'type': tag_pointer.__class__.__name__, - 'id': tag_pointer.id.hex, - }, - }) - return - - if not tag.tagger: - if log: - log.warn("Tag %s has no author, using default values" - % id.hex, extra={ - 'swh_type': 'loader_git_tag_author_default', - 'swh_repo': repo.path, - 'swh_tag_id': tag.id.hex, - }) - author = None - date = None - else: - author = { - 'name': tag.tagger.raw_name, - 'email': tag.tagger.raw_email, - } - date = format_date(tag.tagger) - - return { - 'id': id.raw, - 'date': date, - 'target': tag.target.raw, - 'target_type': 'revision', - 'message': tag._message, - 'name': tag.name.raw, - 'author': author, - 'metadata': None, - 'synthetic': False, - } - - -def ref_to_occurrence(ref): - """Format a reference as an occurrence""" - occ = ref.copy() - if 'branch' in ref: - branch = ref['branch'] - if isinstance(branch, str): - occ['branch'] = branch.encode('utf-8') - else: - occ['branch'] = branch - return occ +HASH_ALGORITHMS = hashutil.ALGORITHMS - {'sha1_git'} def origin_url_to_origin(origin_url): """Format a pygit2.Repository as an origin suitable for swh.storage""" return { 'type': 'git', 'url': origin_url, } def dulwich_blob_to_content(blob, log=None, max_content_size=None, origin_id=None): """Convert a dulwich blob to a Software Heritage content""" if blob.type_name != b'blob': return size = blob.raw_length() ret = { 'sha1_git': blob.sha().digest(), 'length': size, 'status': 'absent' } if max_content_size: if size > max_content_size: if log: log.info('Skipping content %s, too large (%s > %s)' % (blob.id.encode(), size, max_content_size), extra={ 'swh_type': 'loader_git_content_skip', 'swh_id': id.hex, 'swh_size': size, }) ret['reason'] = 'Content too large' ret['origin'] = origin_id return ret data = blob.as_raw_string() hashes = hashutil.hashdata(data, HASH_ALGORITHMS) ret.update(hashes) ret['data'] = data ret['status'] = 'visible' return ret def dulwich_tree_to_directory(tree, log=None): """Format a tree as a directory""" if tree.type_name != b'tree': return ret = { 'id': tree.sha().digest(), } entries = [] ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): entries.append({ 'type': entry_mode_map.get(entry.mode, 'file'), 'perms': entry.mode, 'name': entry.path, 'target': hashutil.hex_to_hash(entry.sha.decode('ascii')), }) return ret def parse_author(name_email): """Parse an author line""" if name_email is None: return None try: open_bracket = name_email.index(b'<') except ValueError: name = email = None else: raw_name = name_email[:open_bracket] raw_email = name_email[open_bracket+1:] if not raw_name: name = None elif raw_name.endswith(b' '): name = raw_name[:-1] else: name = raw_name try: close_bracket = raw_email.index(b'>') except ValueError: email = None else: email = raw_email[:close_bracket] return { 'name': name, 'email': email, 'fullname': name_email, } def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" return { 'timestamp': timestamp, 'offset': timezone // 60, 'negative_utc': timezone_neg_utc if timezone == 0 else None, } def dulwich_commit_to_revision(commit, log=None): if commit.type_name != b'commit': return ret = { 'id': commit.sha().digest(), 'author': parse_author(commit.author), 'date': dulwich_tsinfo_to_timestamp( commit.author_time, commit.author_timezone, commit._author_timezone_neg_utc, ), 'committer': parse_author(commit.committer), 'committer_date': dulwich_tsinfo_to_timestamp( commit.commit_time, commit.commit_timezone, commit._commit_timezone_neg_utc, ), 'type': 'git', 'directory': bytes.fromhex(commit.tree.decode()), 'message': commit.message, 'metadata': None, 'synthetic': False, 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], } git_metadata = [] if commit.encoding is not None: git_metadata.append(['encoding', commit.encoding]) if commit.mergetag: for mergetag in commit.mergetag: git_metadata.append(['mergetag', mergetag.as_raw_string()]) if commit.extra: git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra) if commit.gpgsig: git_metadata.append(['gpgsig', commit.gpgsig]) if git_metadata: ret['metadata'] = { 'extra_headers': git_metadata, } return ret DULWICH_TYPES = { b'blob': 'content', b'tree': 'directory', b'commit': 'revision', b'tag': 'release', } def dulwich_tag_to_release(tag, log=None): if tag.type_name != b'tag': return target_type, target = tag.object ret = { 'id': tag.sha().digest(), 'name': tag.name, 'target': bytes.fromhex(target.decode()), 'target_type': DULWICH_TYPES[target_type.type_name], 'message': tag._message, 'metadata': None, 'synthetic': False, } if tag.tagger: ret['author'] = parse_author(tag.tagger) ret['date'] = dulwich_tsinfo_to_timestamp( tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc, ) else: ret['author'] = ret['date'] = None return ret diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index b7fa942..e005788 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,497 +1,145 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import logging -import traceback -import uuid +from collections import defaultdict +import datetime -import psycopg2 -import pygit2 -from pygit2 import Oid, GIT_OBJ_BLOB, GIT_OBJ_TREE, GIT_OBJ_COMMIT, GIT_OBJ_TAG -import requests -from retrying import retry +import dulwich.repo -from swh.core import config -from swh.storage import get_storage +from swh.core import hashutil -from . import converters -from .utils import get_objects_per_object_type +from . import base, converters -def send_in_packets(source_list, formatter, sender, packet_size, - packet_size_bytes=None, *args, **kwargs): - """Send objects from `source_list`, passed through `formatter` (with - extra args *args, **kwargs), using the `sender`, in packets of - `packet_size` objects (and of max `packet_size_bytes`). - +class GitLoader(base.BaseLoader): + """Load a git repository from a directory. """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in source_list: - formatted_object = formatter(obj, *args, **kwargs) - if formatted_object: - formatted_objects.append(formatted_object) - else: - continue - if packet_size_bytes: - count += formatted_object['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - - -def retry_loading(error): - """Retry policy when we catch a recoverable error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - logger = logging.getLogger('swh.loader.git.BulkLoader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class BulkLoader(config.SWHConfig): - """A bulk loader for a git repository""" - - DEFAULT_CONFIG = { - 'storage_class': ('str', 'remote_storage'), - 'storage_args': ('list[str]', ['http://localhost:5000/']), - - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), - - 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), - } - - def __init__(self, config): - self.config = config - self.storage = get_storage(config['storage_class'], - config['storage_args']) - self.log = logging.getLogger('swh.loader.git.BulkLoader') - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): - """Actually send properly formatted contents to the database""" - num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): - """Actually send properly formatted directories to the database""" - num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): - """Actually send properly formatted revisions to the database""" - num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): - """Actually send properly formatted releases to the database""" - num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_occurrences(self, occurrence_list): - """Actually send properly formatted occurrences to the database""" - num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - - def get_or_create_origin(self, origin_url): - origin = converters.origin_url_to_origin(origin_url) - - origin['id'] = self.storage.origin_add_one(origin) - - return origin - - def repo_origin(self, repo, origin_url): - log_id = str(uuid.uuid4()) - self.log.debug('Creating origin for %s' % origin_url, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - origin = self.get_or_create_origin(origin_url) - self.log.debug('Done creating origin for %s' % origin_url, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - - return origin - - def bulk_send_blobs(self, repo, blobs, origin_id): - """Format blobs as swh contents and send them to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] + + CONFIG_BASE_FILENAME = 'loader/git-loader.ini' + + def prepare(self, origin_url, directory, fetch_date): + self.origin_url = origin_url + self.repo = dulwich.repo.Repo(directory) + self.fetch_date = fetch_date + + def get_origin(self): + """Get the origin that is currently being loaded""" + return converters.origin_url_to_origin(self.origin_url) + + def iter_objects(self): + object_store = self.repo.object_store + + for pack in object_store.packs: + objs = list(pack.index.iterentries()) + objs.sort(key=lambda x: x[1]) + for sha, offset, crc32 in objs: + yield hashutil.hash_to_bytehex(sha) + + yield from object_store._iter_loose_objects() + yield from object_store._iter_alternate_objects() + + def fetch_data(self): + """Fetch the data from the data source""" + type_to_ids = defaultdict(list) + for oid in self.iter_objects(): + type_name = self.repo[oid].type_name + type_to_ids[type_name].append(oid) + + self.type_to_ids = type_to_ids + + def has_contents(self): + """Checks whether we need to load contents""" + return bool(self.type_to_ids[b'blob']) + + def get_contents(self): + """Get the contents that need to be loaded""" max_content_size = self.config['content_size_limit'] - send_in_packets(blobs, converters.blob_to_content, - self.send_contents, packet_size, repo=repo, - packet_size_bytes=packet_size_bytes, - log=self.log, max_content_size=max_content_size, - origin_id=origin_id) - - def bulk_send_trees(self, repo, trees): - """Format trees as swh directories and send them to the database""" - packet_size = self.config['directory_packet_size'] - - send_in_packets(trees, converters.tree_to_directory, - self.send_directories, packet_size, repo=repo, - log=self.log) - - def bulk_send_commits(self, repo, commits): - """Format commits as swh revisions and send them to the database""" - packet_size = self.config['revision_packet_size'] - - send_in_packets(commits, converters.commit_to_revision, - self.send_revisions, packet_size, repo=repo, - log=self.log) - - def bulk_send_annotated_tags(self, repo, tags): - """Format annotated tags (pygit2.Tag objects) as swh releases and send - them to the database - """ - packet_size = self.config['release_packet_size'] - - send_in_packets(tags, converters.annotated_tag_to_release, - self.send_releases, packet_size, repo=repo, - log=self.log) - - def bulk_send_refs(self, repo, refs): - """Format git references as swh occurrences and send them to the - database - """ - packet_size = self.config['occurrence_packet_size'] - - send_in_packets(refs, converters.ref_to_occurrence, - self.send_occurrences, packet_size) - - def list_repo_refs(self, repo, origin_id, authority_id, validity): - """List all the refs from the given repository. - - Args: - - repo (pygit2.Repository): the repository to list - - origin_id (int): the id of the origin from which the repo is - taken - - validity (datetime.datetime): the validity date for the - repository's refs - - authority_id (str): the uuid of the authority on `validity`. - - Returns: - A list of dicts with keys: - - branch (str): name of the ref - - revision (sha1_git): revision pointed at by the ref - - origin (int) - - validity (datetime.DateTime) - - authority (str) - Compatible with occurrence_add. - """ - - log_id = str(uuid.uuid4()) - - refs = [] - ref_names = repo.listall_references() - for ref_name in ref_names: - ref = repo.lookup_reference(ref_name) - target = ref.target - - if not isinstance(target, Oid): - self.log.debug("Peeling symbolic ref %s pointing at %s" % ( - ref_name, ref.target), extra={ - 'swh_type': 'git_sym_ref_peel', - 'swh_name': ref_name, - 'swh_target': str(ref.target), - 'swh_id': log_id, - }) - target_obj = ref.peel() - else: - target_obj = repo[target] - - if target_obj.type == GIT_OBJ_TAG: - self.log.debug("Peeling ref %s pointing at tag %s" % ( - ref_name, target_obj.name), extra={ - 'swh_type': 'git_ref_peel', - 'swh_name': ref_name, - 'swh_target': str(target_obj.name), - 'swh_id': log_id, - }) - target_obj = ref.peel() - - if not target_obj.type == GIT_OBJ_COMMIT: - self.log.info("Skipping ref %s pointing to %s %s" % ( - ref_name, target_obj.__class__.__name__, - target_obj.id.hex), extra={ - 'swh_type': 'git_ref_skip', - 'swh_name': ref_name, - 'swh_target': str(target_obj), - 'swh_id': log_id, - }) - - refs.append({ - 'branch': ref_name, - 'revision': target_obj.id.raw, + for oid in self.type_to_ids[b'blob']: + yield converters.dulwich_blob_to_content( + self.repo[oid], log=self.log, + max_content_size=max_content_size, + origin_id=self.origin_id) + + def has_directories(self): + """Checks whether we need to load directories""" + return bool(self.type_to_ids[b'tree']) + + def get_directories(self): + """Get the directories that need to be loaded""" + for oid in self.type_to_ids[b'tree']: + yield converters.dulwich_tree_to_directory( + self.repo[oid], log=self.log) + + def has_revisions(self): + """Checks whether we need to load revisions""" + return bool(self.type_to_ids[b'commit']) + + def get_revisions(self): + """Get the revisions that need to be loaded""" + for oid in self.type_to_ids[b'commit']: + yield converters.dulwich_commit_to_revision( + self.repo[oid], log=self.log) + + def has_releases(self): + """Checks whether we need to load releases""" + return bool(self.type_to_ids[b'tag']) + + def get_releases(self): + """Get the releases that need to be loaded""" + for oid in self.type_to_ids[b'tag']: + yield converters.dulwich_tag_to_release( + self.repo[oid], log=self.log) + + def has_occurrences(self): + """Checks whether we need to load occurrences""" + return True + + def get_occurrences(self): + """Get the occurrences that need to be loaded""" + repo = self.repo + origin_id = self.origin_id + fetch_date = self.fetch_date + + for ref, target in repo.refs.as_dict().items(): + target_type_name = repo[target].type_name + target_type = converters.DULWICH_TYPES[target_type_name] + yield { + 'branch': ref, 'origin': origin_id, - 'validity': validity, - 'authority': authority_id, - }) - - return refs - - def list_repo_objs(self, repo): - """List all the objects from repo. - - Args: - - repo (pygit2.Repository): the repository to list - - Returns: - a dict containing lists of `Oid`s with keys for each object type: - - GIT_OBJ_BLOB - - GIT_OBJ_TREE - - GIT_OBJ_COMMIT - - GIT_OBJ_TAG - """ - log_id = str(uuid.uuid4()) - - self.log.info("Started listing %s" % repo.path, extra={ - 'swh_type': 'git_list_objs_start', - 'swh_repo': repo.path, - 'swh_id': log_id, - }) - objects = get_objects_per_object_type(repo) - self.log.info("Done listing the objects in %s: %d contents, " - "%d directories, %d revisions, %d releases" % ( - repo.path, - len(objects[GIT_OBJ_BLOB]), - len(objects[GIT_OBJ_TREE]), - len(objects[GIT_OBJ_COMMIT]), - len(objects[GIT_OBJ_TAG]), - ), extra={ - 'swh_type': 'git_list_objs_end', - 'swh_repo': repo.path, - 'swh_num_blobs': len(objects[GIT_OBJ_BLOB]), - 'swh_num_trees': len(objects[GIT_OBJ_TREE]), - 'swh_num_commits': len(objects[GIT_OBJ_COMMIT]), - 'swh_num_tags': len(objects[GIT_OBJ_TAG]), - 'swh_id': log_id, - }) - - return objects - - def open_repo(self, repo_path): - return pygit2.Repository(repo_path) - - def open_fetch_history(self, origin_id): - return self.storage.fetch_history_start(origin_id) - - def close_fetch_history_success(self, fetch_history_id, objects, refs): - data = { - 'status': True, - 'result': { - 'contents': len(objects.get(GIT_OBJ_BLOB, [])), - 'directories': len(objects.get(GIT_OBJ_TREE, [])), - 'revisions': len(objects.get(GIT_OBJ_COMMIT, [])), - 'releases': len(objects.get(GIT_OBJ_TAG, [])), - 'occurrences': len(refs), - }, + 'target': hashutil.bytehex_to_hash(target), + 'target_type': target_type, + 'date': fetch_date, + } + + def get_fetch_history_result(self): + """Return the data to store in fetch_history for the current loader""" + return { + 'contents': len(self.type_to_ids[b'blob']), + 'directories': len(self.type_to_ids[b'tree']), + 'revisions': len(self.type_to_ids[b'commit']), + 'releases': len(self.type_to_ids[b'tag']), + 'occurrences': len(self.repo.refs.allkeys()), } - return self.storage.fetch_history_end(fetch_history_id, data) - def close_fetch_history_failure(self, fetch_history_id): - import traceback - data = { - 'status': False, - 'stderr': traceback.format_exc(), - } - return self.storage.fetch_history_end(fetch_history_id, data) - - def load_repo(self, repo, objects, refs, origin_id): - - if self.config['send_contents']: - self.bulk_send_blobs(repo, objects[GIT_OBJ_BLOB], origin_id) - else: - self.log.info('Not sending contents') - - if self.config['send_directories']: - self.bulk_send_trees(repo, objects[GIT_OBJ_TREE]) - else: - self.log.info('Not sending directories') - - if self.config['send_revisions']: - self.bulk_send_commits(repo, objects[GIT_OBJ_COMMIT]) - else: - self.log.info('Not sending revisions') - - if self.config['send_releases']: - self.bulk_send_annotated_tags(repo, objects[GIT_OBJ_TAG]) - else: - self.log.info('Not sending releases') - - if self.config['send_occurrences']: - self.bulk_send_refs(repo, refs) - else: - self.log.info('Not sending occurrences') - - def process(self, repo_path, origin_url, authority_id, validity): - # Open repository - repo = self.open_repo(repo_path) - - # Add origin to storage if needed, use the one from config if not - origin = self.repo_origin(repo, origin_url) - - # Create fetch_history - fetch_history = self.open_fetch_history(origin['id']) - closed = False - - try: - # Parse all the refs from our repo - refs = self.list_repo_refs(repo, origin['id'], authority_id, - validity) - - if not refs: - self.log.info('Skipping empty repository %s' % repo_path, - extra={ - 'swh_type': 'git_repo_list_refs', - 'swh_repo': repo_path, - 'swh_num_refs': 0, - }) - # End fetch_history - self.close_fetch_history_success(fetch_history, {}, refs) - closed = True - return - else: - self.log.info('Listed %d refs for repo %s' % ( - len(refs), repo_path), extra={ - 'swh_type': 'git_repo_list_refs', - 'swh_repo': repo_path, - 'swh_num_refs': len(refs), - }) - - # We want to load the repository, walk all the objects - objects = self.list_repo_objs(repo) - - # Finally, load the repository - self.load_repo(repo, objects, refs, origin['id']) - - # End fetch_history - self.close_fetch_history_success(fetch_history, objects, refs) - closed = True - - finally: - if not closed: - self.close_fetch_history_failure(fetch_history) + def eventful(self): + """Whether the load was eventful""" + return True + +if __name__ == '__main__': + import logging + import sys + + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(process)d %(message)s' + ) + loader = GitLoader() + + origin_url = sys.argv[1] + directory = sys.argv[2] + fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) + + print(loader.load(origin_url, directory, fetch_date)) diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index d9bf18d..98e2333 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,119 +1,38 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import os +import dateutil.parser from swh.scheduler.task import Task -from .loader import BulkLoader +from .loader import GitLoader from .updater import BulkUpdater -class LoadGitRepository(Task): - """Import a git repository to Software Heritage""" - - task_queue = 'swh_loader_git' - - CONFIG_BASE_FILENAME = 'loader/git.ini' - ADDITIONAL_CONFIG = {} - - def __init__(self): - self.config = BulkLoader.parse_config_file( - base_filename=self.CONFIG_BASE_FILENAME, - additional_configs=[self.ADDITIONAL_CONFIG], - ) - - def run(self, repo_path, origin_url, authority_id, validity): - """Import a git repository""" - loader = BulkLoader(self.config) - loader.log = self.log - - loader.process(repo_path, origin_url, authority_id, validity) - - +# TODO: rename to LoadRemoteGitRepository class UpdateGitRepository(Task): - """Import a git repository as an update""" + """Import a git repository from a remote location""" task_queue = 'swh_loader_git' - CONFIG_BASE_FILENAME = 'loader/git-updater.ini' - ADDITIONAL_CONFIG = {} - - def __init__(self): - self.config = BulkUpdater.parse_config_file( - base_filename=self.CONFIG_BASE_FILENAME, - additional_configs=[self.ADDITIONAL_CONFIG], - ) - def run(self, repo_url, base_url=None): """Import a git repository""" - loader = BulkUpdater(self.config) + loader = BulkUpdater() loader.log = self.log - return loader.process(repo_url, base_url) - + return loader.load(repo_url, base_url) -class LoadGitHubRepository(LoadGitRepository): - """Import a github repository to Software Heritage""" +class LoadDiskGitRepository(Task): + """Import a git repository from disk""" task_queue = 'swh_loader_git' - CONFIG_BASE_FILENAME = 'loader/github.ini' - ADDITIONAL_CONFIG = { - 'github_basepath': ('str', '/srv/storage/space/data/github'), - 'authority_id': ('str', '5f4d4c51-498a-4e28-88b3-b3e4e8396cba'), - 'default_validity': ('str', '1970-01-01 00:00:00+00'), - } - - def run(self, repo_fullname): - authority_id = self.config['authority_id'] - validity = self.config['default_validity'] + def run(self, origin_url, directory, date): + """Import a git repository, cloned in `directory` from `origin_url` at + `date`.""" - repo_path = os.path.join(self.config['github_basepath'], - repo_fullname[0], repo_fullname) - - witness_file = os.path.join(repo_path, 'witness') - if os.path.exists(witness_file): - validity_timestamp = os.stat(witness_file).st_mtime - validity = '%s+00' % datetime.datetime.utcfromtimestamp( - validity_timestamp) - - origin_url = 'https://github.com/%s' % repo_fullname - - super().run(repo_path, origin_url, authority_id, validity) - - -class LoadGitHubRepositoryReleases(LoadGitHubRepository): - """Import a GitHub repository to SoftwareHeritage, only with releases""" - - task_queue = 'swh_loader_git_express' - - def __init__(self): - super(self.__class__, self).__init__() - - self.config.update({ - 'send_contents': False, - 'send_directories': False, - 'send_revisions': False, - 'send_releases': True, - 'send_occurrences': False, - }) - - -class LoadGitHubRepositoryContents(LoadGitHubRepository): - """Import a GitHub repository to SoftwareHeritage, only with contents""" - - task_queue = 'swh_loader_git_express' - - def __init__(self): - super(self.__class__, self).__init__() + loader = GitLoader() + loader.log = self.log - self.config.update({ - 'send_contents': True, - 'send_directories': False, - 'send_revisions': False, - 'send_releases': False, - 'send_occurrences': False, - }) + return loader.load(origin_url, directory, dateutil.parser.parse(date)) diff --git a/swh/loader/git/tests/test_converters.py b/swh/loader/git/tests/test_converters.py index c634333..fddf249 100644 --- a/swh/loader/git/tests/test_converters.py +++ b/swh/loader/git/tests/test_converters.py @@ -1,199 +1,170 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile import unittest -import datetime from nose.tools import istest -import pygit2 +import dulwich.repo import swh.loader.git.converters as converters -from swh.core.hashutil import hex_to_hash +from swh.core.hashutil import bytehex_to_hash, hex_to_hash class TestConverters(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.repo_path = tempfile.mkdtemp() - cls.repo = pygit2.init_repository(cls.repo_path, bare=True) + cls.repo = dulwich.repo.Repo.init_bare(cls.repo_path) fast_export = os.path.join(os.path.dirname(__file__), '../../../../..', 'swh-storage-testdata', 'git-repos', 'example-submodule.fast-export.xz') xz = subprocess.Popen( ['xzcat'], stdin=open(fast_export, 'rb'), stdout=subprocess.PIPE, ) git = subprocess.Popen( ['git', 'fast-import', '--quiet'], stdin=xz.stdout, cwd=cls.repo_path, ) # flush stdout of xz xz.stdout.close() git.communicate() @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.repo_path) print(cls.repo_path) def setUp(self): super().setUp() - self.blob_id = pygit2.Oid( - hex='28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0') + self.blob_id = b'28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0' self.blob = { - 'sha1_git': self.blob_id.raw, + 'sha1_git': bytehex_to_hash(self.blob_id), 'sha1': hex_to_hash('4850a3420a2262ff061cb296fb915430fa92301c'), 'sha256': hex_to_hash('fee7c8a485a10321ad94b64135073cb5' '5f22cb9f57fa2417d2adfb09d310adef'), 'data': (b'[submodule "example-dependency"]\n' b'\tpath = example-dependency\n' b'\turl = https://github.com/githubtraining/' b'example-dependency.git\n'), 'length': 124, 'status': 'visible', } self.blob_hidden = { - 'sha1_git': self.blob_id.raw, + 'sha1_git': bytehex_to_hash(self.blob_id), 'length': 124, 'status': 'absent', 'reason': 'Content too large', 'origin': None, } @istest def blob_to_content(self): - content = converters.blob_to_content(self.blob_id, self.repo) + content = converters.dulwich_blob_to_content(self.repo[self.blob_id]) self.assertEqual(self.blob, content) @istest def blob_to_content_absent(self): max_length = self.blob['length'] - 1 - content = converters.blob_to_content(self.blob_id, self.repo, - max_content_size=max_length) + content = converters.dulwich_blob_to_content( + self.repo[self.blob_id], max_content_size=max_length) self.assertEqual(self.blob_hidden, content) @istest def commit_to_revision(self): - sha1 = '9768d0b576dbaaecd80abedad6dfd0d72f1476da' - commit = self.repo.revparse_single(sha1) + sha1 = b'9768d0b576dbaaecd80abedad6dfd0d72f1476da' - # when - actual_revision = converters.commit_to_revision(commit.id, self.repo) + revision = converters.dulwich_commit_to_revision(self.repo[sha1]) - offset = datetime.timedelta(minutes=120) - tzoffset = datetime.timezone(offset) expected_revision = { 'id': hex_to_hash('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), 'directory': b'\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca', 'type': 'git', 'committer': { 'name': b'Stefano Zacchiroli', + 'fullname': b'Stefano Zacchiroli ', 'email': b'zack@upsilon.cc', }, 'author': { 'name': b'Stefano Zacchiroli', + 'fullname': b'Stefano Zacchiroli ', 'email': b'zack@upsilon.cc', }, - 'committer_date': datetime.datetime(2015, 9, 24, 10, 36, 5, - tzinfo=tzoffset), + 'committer_date': { + 'negative_utc': None, + 'timestamp': 1443083765, + 'offset': 120, + }, 'message': b'add submodule dependency\n', 'metadata': None, - 'date': datetime.datetime(2015, 9, 24, 10, 36, 5, - tzinfo=tzoffset), + 'date': { + 'negative_utc': None, + 'timestamp': 1443083765, + 'offset': 120, + }, 'parents': [ b'\xc3\xc5\x88q23`\x9f[\xbb\xb2\xd9\xe7\xf3\xfbJf\x0f?r' ], 'synthetic': False, } - # then - self.assertEquals(actual_revision, expected_revision) - self.assertEquals(offset, expected_revision['date'].utcoffset()) - self.assertEquals(offset, - expected_revision['committer_date'].utcoffset()) - - @istest - def ref_to_occurrence_1(self): - # when - actual_occ = converters.ref_to_occurrence({ - 'id': 'some-id', - 'branch': 'some/branch' - }) - # then - self.assertEquals(actual_occ, { - 'id': 'some-id', - 'branch': b'some/branch' - }) - - @istest - def ref_to_occurrence_2(self): - # when - actual_occ = converters.ref_to_occurrence({ - 'id': 'some-id', - 'branch': b'some/branch' - }) - - # then - self.assertEquals(actual_occ, { - 'id': 'some-id', - 'branch': b'some/branch' - }) + self.assertEquals(revision, expected_revision) @istest def author_line_to_author(self): tests = { b'a ': { 'name': b'a', 'email': b'b@c.com', 'fullname': b'a ', }, b'': { 'name': None, 'email': b'foo@bar.com', 'fullname': b'', }, b'malformed ': { 'name': b'trailing', 'email': b'sp@c.e', 'fullname': b'trailing ', }, b'no': { 'name': b'no', 'email': b'sp@c.e', 'fullname': b'no', }, b' <>': { 'name': b'', 'email': b'', 'fullname': b' <>', }, } for author in sorted(tests): parsed_author = tests[author] self.assertEquals(parsed_author, converters.parse_author(author)) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index a8852b3..58a2958 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,661 +1,421 @@ -# Copyright (C) 2015 The Software Heritage developers +# Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from io import BytesIO import logging +import os +import pickle import sys -import traceback -import uuid from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater -import psycopg2 -import requests -from retrying import retry from urllib.parse import urlparse -from swh.core import config, hashutil -from swh.storage import get_storage +from swh.core import hashutil -from . import converters +from . import base, converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: self.heads = set(self._cache_heads(origin_id, occurrences)) else: self.heads = set() def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" root_rev = hashutil.bytehex_to_hash(commit) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id, occurrences): """Return all the known head commits for `origin_id`""" if not occurrences: occurrences = self.storage.occurrence_get(origin_id) return self._decode_from_storage( occurrence['target'] for occurrence in occurrences ) def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret -def send_in_packets(source_list, formatter, sender, packet_size, - packet_size_bytes=None, *args, **kwargs): - """Send objects from `source_list`, passed through `formatter` (with - extra args *args, **kwargs), using the `sender`, in packets of - `packet_size` objects (and of max `packet_size_bytes`). - - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in source_list: - formatted_object = formatter(obj, *args, **kwargs) - if formatted_object: - formatted_objects.append(formatted_object) - else: - continue - if packet_size_bytes: - count += formatted_object['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) - - -def retry_loading(error): - """Retry policy when we catch a recoverable error""" - exception_classes = [ - # raised when two parallel insertions insert the same data. - psycopg2.IntegrityError, - # raised when uWSGI restarts and hungs up on the worker. - requests.exceptions.ConnectionError, - ] - - if not any(isinstance(error, exc) for exc in exception_classes): - return False - - logger = logging.getLogger('swh.loader.git.BulkLoader') - - error_name = error.__module__ + '.' + error.__class__.__name__ - logger.warning('Retry loading a batch', exc_info=False, extra={ - 'swh_type': 'storage_retry', - 'swh_exception_type': error_name, - 'swh_exception': traceback.format_exception( - error.__class__, - error, - error.__traceback__, - ), - }) - - return True - - -class BulkUpdater(config.SWHConfig): +class BulkUpdater(base.BaseLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git-updater.ini' - DEFAULT_CONFIG = { - 'storage_class': ('str', 'remote_storage'), - 'storage_args': ('list[str]', ['http://localhost:5000/']), + ADDITIONAL_CONFIG = { + 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), + 'pack_storage_base': ('str', ''), + } - 'send_contents': ('bool', True), - 'send_directories': ('bool', True), - 'send_revisions': ('bool', True), - 'send_releases': ('bool', True), - 'send_occurrences': ('bool', True), + def store_pack_and_refs(self, pack_buffer, remote_refs): + """Store a pack for archival""" - 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), - 'directory_packet_size': ('int', 25000), - 'revision_packet_size': ('int', 100000), - 'release_packet_size': ('int', 100000), - 'occurrence_packet_size': ('int', 100000), + write_size = 8192 - 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), - } + origin_id = "%08d" % self.origin_id + year = str(self.fetch_date.year) - def __init__(self, config): - self.config = config - self.storage = get_storage(config['storage_class'], - config['storage_args']) - self.log = logging.getLogger('swh.loader.git.BulkLoader') - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_contents(self, content_list): - """Actually send properly formatted contents to the database""" - num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_directories(self, directory_list): - """Actually send properly formatted directories to the database""" - num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_revisions(self, revision_list): - """Actually send properly formatted revisions to the database""" - num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_releases(self, release_list): - """Actually send properly formatted releases to the database""" - num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - - @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) - def send_occurrences(self, occurrence_list): - """Actually send properly formatted occurrences to the database""" - num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) + pack_dir = os.path.join( + self.config['pack_storage_base'], + origin_id, + year, + ) + pack_name = "%s.pack" % self.fetch_date.isoformat() + refs_name = "%s.refs" % self.fetch_date.isoformat() + + os.makedirs(pack_dir) + with open(os.path.join(pack_dir, pack_name), 'xb') as f: + while True: + r = pack_buffer.read(write_size) + if not r: + break + f.write(r) + + pack_buffer.seek(0) + + with open(os.path.join(pack_dir, refs_name), 'xb') as f: + pickle.dump(remote_refs, f) def fetch_pack_from_origin(self, origin_url, base_origin_id, base_occurrences, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = SWHRepoRepresentation(self.storage, base_origin_id, base_occurrences) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data, pack_buffer=pack_buffer, limit=size_limit): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) remote_refs = client.fetch_pack(path.encode('ascii'), base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) + + if self.config['pack_storage_base']: + self.store_pack_and_refs(pack_buffer, remote_refs) + return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } - def get_origin(self, origin_url): - origin = converters.origin_url_to_origin(origin_url) - - return self.storage.origin_get(origin) + def list_pack(self, pack_data, pack_size): + id_to_type = {} + type_to_ids = defaultdict(set) - def get_or_create_origin(self, origin_url): - origin = converters.origin_url_to_origin(origin_url) + inflater = self.get_inflater() - origin['id'] = self.storage.origin_add_one(origin) - - return origin - - def create_origin(self, origin_url): - log_id = str(uuid.uuid4()) - self.log.debug('Creating origin for %s' % origin_url, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - origin = self.get_or_create_origin(origin_url) - self.log.debug('Done creating origin for %s' % origin_url, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'origin', - 'swh_num': 1, - 'swh_id': log_id - }) - - return origin - - def bulk_send_blobs(self, inflater, origin_id): - """Format blobs as swh contents and send them to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - max_content_size = self.config['content_size_limit'] + for obj in inflater: + type, id = obj.type_name, obj.id + id_to_type[id] = type + type_to_ids[type].add(id) - send_in_packets(inflater, converters.dulwich_blob_to_content, - self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes, - log=self.log, max_content_size=max_content_size, - origin_id=origin_id) + return id_to_type, type_to_ids - def bulk_send_trees(self, inflater): - """Format trees as swh directories and send them to the database""" - packet_size = self.config['directory_packet_size'] + def prepare(self, origin_url, base_url=None): + origin = converters.origin_url_to_origin(origin_url) + base_origin = converters.origin_url_to_origin(base_url) - send_in_packets(inflater, converters.dulwich_tree_to_directory, - self.send_directories, packet_size, - log=self.log) + base_occurrences = [] + base_origin_id = origin_id = None - def bulk_send_commits(self, inflater): - """Format commits as swh revisions and send them to the database""" - packet_size = self.config['revision_packet_size'] + db_origin = self.storage.origin_get(origin) + if db_origin: + base_origin_id = origin_id = db_origin['id'] - send_in_packets(inflater, converters.dulwich_commit_to_revision, - self.send_revisions, packet_size, - log=self.log) + if origin_id: + base_occurrences = self.storage.occurrence_get(origin_id) - def bulk_send_tags(self, inflater): - """Format annotated tags (dulwich.objects.Tag objects) as swh releases and send - them to the database - """ - packet_size = self.config['release_packet_size'] + if base_url and not base_occurrences: + base_origin = self.storage.origin_get(base_origin) + if base_origin: + base_origin_id = base_origin['id'] + base_occurrences = self.storage.occurrence_get(base_origin_id) - send_in_packets(inflater, converters.dulwich_tag_to_release, - self.send_releases, packet_size, - log=self.log) + self.base_occurrences = list(sorted(base_occurrences, + key=lambda occ: occ['branch'])) + self.base_origin_id = base_origin_id + self.origin = origin - def bulk_send_refs(self, refs): - """Format git references as swh occurrences and send them to the - database - """ - packet_size = self.config['occurrence_packet_size'] - - send_in_packets(refs, lambda x: x, self.send_occurrences, packet_size) - - def open_fetch_history(self, origin_id): - return self.storage.fetch_history_start(origin_id) - - def close_fetch_history_success(self, fetch_history_id, objects, refs): - data = { - 'status': True, - 'result': { - 'contents': len(objects[b'blob']), - 'directories': len(objects[b'tree']), - 'revisions': len(objects[b'commit']), - 'releases': len(objects[b'tag']), - 'occurrences': len(refs), - }, - } - return self.storage.fetch_history_end(fetch_history_id, data) + def get_origin(self): + return self.origin - def close_fetch_history_failure(self, fetch_history_id): - import traceback - data = { - 'status': False, - 'stderr': traceback.format_exc(), - } - return self.storage.fetch_history_end(fetch_history_id, data) + def fetch_data(self): + def do_progress(msg): + sys.stderr.buffer.write(msg) + sys.stderr.flush() - def get_inflater(self, pack_buffer, pack_size): - """Reset the pack buffer and get an object inflater from it""" - pack_buffer.seek(0) - return PackInflater.for_pack_data( - PackData.from_file(pack_buffer, pack_size)) + self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) - def list_pack(self, pack_data, pack_size): - id_to_type = {} - type_to_ids = defaultdict(set) + fetch_info = self.fetch_pack_from_origin( + self.origin['url'], self.base_origin_id, self.base_occurrences, + do_progress) - inflater = self.get_inflater(pack_data, pack_size) + self.pack_buffer = fetch_info['pack_buffer'] + self.pack_size = fetch_info['pack_size'] - for obj in inflater: - type, id = obj.type_name, obj.id - id_to_type[id] = type - type_to_ids[type].add(id) + self.remote_refs = fetch_info['remote_refs'] + self.local_refs = fetch_info['local_refs'] + if not self.remote_refs: + raise ValueError('Handle no remote refs') - return id_to_type, type_to_ids + origin_url = self.origin['url'] - def list_refs(self, remote_refs, local_refs, id_to_type, origin_id, date): - ret = [] - for ref in remote_refs: - ret_ref = local_refs[ref].copy() - ret_ref.update({ - 'branch': ref, - 'origin': origin_id, - 'date': date, + self.log.info('Listed %d refs for repo %s' % ( + len(self.remote_refs), origin_url), extra={ + 'swh_type': 'git_repo_list_refs', + 'swh_repo': origin_url, + 'swh_num_refs': len(self.remote_refs), }) - if not ret_ref['target_type']: - target_type = id_to_type[ret_ref['target']] - ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] - ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) + # We want to load the repository, walk all the objects + id_to_type, type_to_ids = self.list_pack(self.pack_buffer, + self.pack_size) - ret.append(ret_ref) + self.id_to_type = id_to_type + self.type_to_ids = type_to_ids - return ret - - def load_pack(self, pack_buffer, pack_size, refs, type_to_ids, origin_id): - if self.config['send_contents']: - if type_to_ids[b'blob']: - self.bulk_send_blobs(self.get_inflater(pack_buffer, pack_size), - origin_id) - else: - self.log.info('Not sending contents') + def get_inflater(self): + """Reset the pack buffer and get an object inflater from it""" + self.pack_buffer.seek(0) + return PackInflater.for_pack_data( + PackData.from_file(self.pack_buffer, self.pack_size)) - if self.config['send_directories']: - if type_to_ids[b'tree']: - self.bulk_send_trees(self.get_inflater(pack_buffer, pack_size)) - else: - self.log.info('Not sending directories') + def has_contents(self): + return bool(self.type_to_ids[b'blob']) - if self.config['send_revisions']: - if type_to_ids[b'commit']: - self.bulk_send_commits(self.get_inflater(pack_buffer, - pack_size)) - else: - self.log.info('Not sending revisions') + def get_contents(self): + """Format the blobs from the git repository as swh contents""" + max_content_size = self.config['content_size_limit'] + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'blob': + continue - if self.config['send_releases']: - if type_to_ids[b'tag']: - self.bulk_send_tags(self.get_inflater(pack_buffer, pack_size)) - else: - self.log.info('Not sending releases') + yield converters.dulwich_blob_to_content( + raw_obj, log=self.log, max_content_size=max_content_size, + origin_id=self.origin_id) - if self.config['send_occurrences']: - self.bulk_send_refs(refs) - else: - self.log.info('Not sending occurrences') + def has_directories(self): + return bool(self.type_to_ids[b'tree']) - def process(self, origin_url, base_url): - eventful = False + def get_directories(self): + """Format the trees as swh directories""" + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'tree': + continue - date = datetime.datetime.now(tz=datetime.timezone.utc) + yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) - origin = self.create_origin(origin_url) - base_origin = origin - base_occurrences = list(self.storage.occurrence_get(origin['id'])) + def has_revisions(self): + return bool(self.type_to_ids[b'commit']) - original_heads = list(sorted(base_occurrences, - key=lambda h: h['branch'])) + def get_revisions(self): + """Format commits as swh revisions""" + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'commit': + continue - if base_url and not original_heads: - base_origin = self.get_origin(base_url) - base_occurrences = None + yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) - # Create fetch_history - fetch_history = self.open_fetch_history(origin['id']) - closed = False + def has_releases(self): + return bool(self.type_to_ids[b'tag']) - def do_progress(msg): - sys.stderr.buffer.write(msg) - sys.stderr.flush() + def get_releases(self): + """Retrieve all the release objects from the git repository""" + for raw_obj in self.get_inflater(): + if raw_obj.type_name != b'tag': + continue - try: - fetch_info = self.fetch_pack_from_origin( - origin_url, base_origin['id'], base_occurrences, do_progress) - - pack_buffer = fetch_info['pack_buffer'] - pack_size = fetch_info['pack_size'] - - remote_refs = fetch_info['remote_refs'] - local_refs = fetch_info['local_refs'] - if not remote_refs: - self.log.info('Skipping empty repository %s' % origin_url, - extra={ - 'swh_type': 'git_repo_list_refs', - 'swh_repo': origin_url, - 'swh_num_refs': 0, - }) - # End fetch_history - self.close_fetch_history_success(fetch_history, - defaultdict(set), []) - closed = True - - # If the original repo was not empty, then the run was eventful - return bool(original_heads) - else: - self.log.info('Listed %d refs for repo %s' % ( - len(remote_refs), origin_url), extra={ - 'swh_type': 'git_repo_list_refs', - 'swh_repo': origin_url, - 'swh_num_refs': len(remote_refs), - }) + yield converters.dulwich_tag_to_release(raw_obj, log=self.log) - # We want to load the repository, walk all the objects - id_to_type, type_to_ids = self.list_pack(pack_buffer, pack_size) + def has_occurrences(self): + return bool(self.remote_refs) - # Parse the remote references and add info from the local ones - refs = self.list_refs(remote_refs, local_refs, - id_to_type, origin['id'], date) + def get_occurrences(self): + ret = [] + for ref in self.remote_refs: + ret_ref = self.local_refs[ref].copy() + ret_ref.update({ + 'branch': ref, + 'origin': self.origin_id, + 'date': self.fetch_date, + }) + if not ret_ref['target_type']: + target_type = self.id_to_type[ret_ref['target']] + ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] - # Finally, load the repository - self.load_pack(pack_buffer, pack_size, refs, type_to_ids, - origin['id']) + ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) - end_heads = list(self.storage.occurrence_get(origin['id'])) - end_heads.sort(key=lambda h: h['branch']) + ret.append(ret_ref) - eventful = original_heads != end_heads + return ret - # End fetch_history - self.close_fetch_history_success(fetch_history, type_to_ids, refs) - closed = True + def get_fetch_history_result(self): + return { + 'contents': len(self.type_to_ids[b'blob']), + 'directories': len(self.type_to_ids[b'tree']), + 'revisions': len(self.type_to_ids[b'commit']), + 'releases': len(self.type_to_ids[b'tag']), + 'occurrences': len(self.remote_refs), + } - finally: - if not closed: - self.close_fetch_history_failure(fetch_history) + def eventful(self): + """The load was eventful if the current occurrences are different to + the ones we retrieved at the beginning of the run""" + current_occurrences = list(sorted( + self.storage.occurrence_get(self.origin_id), + key=lambda occ: occ['branch'], + )) - return eventful + return self.base_occurrences != current_occurrences if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) - config = BulkUpdater.parse_config_file( - base_filename='loader/git-updater.ini' - ) - - bulkupdater = BulkUpdater(config) + bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] - print(bulkupdater.process(origin_url, base_url)) + print(bulkupdater.load(origin_url, base_url)) diff --git a/swh/loader/git/utils.py b/swh/loader/git/utils.py deleted file mode 100644 index e6fb155..0000000 --- a/swh/loader/git/utils.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import datetime -import glob -import os -import subprocess - -from collections import defaultdict - -from pygit2 import Oid - - -def format_date(signature): - """Convert the date from a signature to a datetime""" - tz = datetime.timezone(datetime.timedelta(minutes=signature.offset)) - return datetime.datetime.fromtimestamp(signature.time, tz) - - -def list_objects_from_packfile_index(packfile_index): - """List the objects indexed by this packfile, in packfile offset - order. - """ - input_file = open(packfile_index, 'rb') - - with subprocess.Popen( - ['/usr/bin/git', 'show-index'], - stdin=input_file, - stdout=subprocess.PIPE, - ) as process: - - data = [] - - for line in process.stdout.readlines(): - # git show-index returns the line as: - # () - line_components = line.split() - offset = int(line_components[0]) - object_id = line_components[1] - - data.append((offset, object_id)) - - yield from (Oid(hex=object_id.decode('ascii')) - for _, object_id in sorted(data)) - - input_file.close() - - -def simple_list_objects(repo): - """List the objects in a given repository. Watch out for duplicates!""" - objects_dir = os.path.join(repo.path, 'objects') - # Git hashes are 40-character long - objects_glob = os.path.join(objects_dir, '[0-9a-f]' * 2, '[0-9a-f]' * 38) - - packfile_dir = os.path.join(objects_dir, 'pack') - - if os.path.isdir(packfile_dir): - for packfile_index in os.listdir(packfile_dir): - if not packfile_index.endswith('.idx'): - # Not an index file - continue - packfile_index_path = os.path.join(packfile_dir, packfile_index) - yield from list_objects_from_packfile_index(packfile_index_path) - - for object_file in glob.glob(objects_glob): - # Rebuild the object id as the last two components of the path - yield Oid(hex=''.join(object_file.split(os.path.sep)[-2:])) - - -def list_objects(repo): - """List the objects in a given repository, removing duplicates""" - seen = set() - for oid in simple_list_objects(repo): - if oid not in seen: - yield oid - seen.add(oid) - - -def get_objects_per_object_type(repo): - """Get all the (pygit2-parsed) objects from repo per object type""" - objects_per_object_type = defaultdict(list) - - for object_id in list_objects(repo): - object = repo[object_id] - objects_per_object_type[object.type].append(object_id) - - return objects_per_object_type diff --git a/version.txt b/version.txt index 235791b..57d6dd8 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.13-0-gad33bb3 \ No newline at end of file +v0.0.14-0-g47ce8b3 \ No newline at end of file