diff --git a/PKG-INFO b/PKG-INFO index 29c07a8..c749036 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.23 +Version: 0.0.24 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/send-url.py b/bin/send-url.py deleted file mode 100755 index 2818e65..0000000 --- a/bin/send-url.py +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env python3 - -import click -import logging -import json - -from swh.scheduler.celery_backend.config import app -from swh.loader.git import tasks # noqa - - -@click.command() -@click.option('--json-file', help="Json file from github api") -@click.option('--queue', default="swh.loader.git.tasks.ReaderGitRepository", - help="Destination queue") -@click.option('--limit', default=None, help='limit on urls to send') -def main(json_file, queue, limit): - logging.basicConfig( - level=logging.DEBUG, - format='%(asctime)s %(process)d %(message)s' - ) - if limit: - limit = int(limit) - - repos = json.loads(open(json_file).read()) - - task_destination = app.tasks[queue] - - count = 0 - for repo in repos['items']: - url = repo['html_url'] - count += 1 - task_destination.delay(url) - logging.info(url) - if limit and count >= limit: - return - - -if __name__ == '__main__': - main() diff --git a/debian/control b/debian/control index f0dde19..5d78de5 100644 --- a/debian/control +++ b/debian/control @@ -1,28 +1,28 @@ Source: swh-loader-git Maintainer: Software Heritage developers Section: python Priority: optional Build-Depends: debhelper (>= 9), dh-python, python3-all, python3-nose, python3-dulwich, python3-retrying, python3-setuptools, python3-click, python3-swh.core (>= 0.0.7~), python3-swh.model (>= 0.0.3~), python3-swh.scheduler, - python3-swh.storage (>= 0.0.50~), + python3-swh.storage (>= 0.0.76~), python3-vcversioner Standards-Version: 3.9.6 Homepage: https://forge.softwareheritage.org/diffusion/DLDG/ Package: python3-swh.loader.git Architecture: all Depends: python3-swh.core (>= 0.0.7~), - python3-swh.storage (>= 0.0.50~), + python3-swh.storage (>= 0.0.76~), python3-swh.model (>= 0.0.3~), ${misc:Depends}, ${python3:Depends} Description: Software Heritage Git loader diff --git a/requirements.txt b/requirements.txt index e7b0a99..7cc1593 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ dulwich retrying vcversioner click swh.core >= 0.0.7 swh.model >= 0.0.3 swh.scheduler -swh.storage >= 0.0.50 +swh.storage >= 0.0.76 diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index 29c07a8..c749036 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.23 +Version: 0.0.24 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/SOURCES.txt b/swh.loader.git.egg-info/SOURCES.txt index e83a1fa..6209d26 100644 --- a/swh.loader.git.egg-info/SOURCES.txt +++ b/swh.loader.git.egg-info/SOURCES.txt @@ -1,39 +1,38 @@ .gitignore .gitmodules AUTHORS LICENSE MANIFEST.in Makefile README requirements.txt setup.py version.txt bin/dir-git-repo-meta.sh -bin/send-url.py debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/attic/api-backend-protocol.txt docs/attic/git-loading-design.txt resources/local-loader-git.ini resources/remote-loader-git.ini resources/updater.ini resources/test/back.ini resources/test/db-manager.ini scratch/analyse-profile.py swh.loader.git.egg-info/PKG-INFO swh.loader.git.egg-info/SOURCES.txt swh.loader.git.egg-info/dependency_links.txt swh.loader.git.egg-info/requires.txt swh.loader.git.egg-info/top_level.txt swh/loader/git/__init__.py swh/loader/git/base.py swh/loader/git/converters.py swh/loader/git/loader.py swh/loader/git/reader.py swh/loader/git/tasks.py swh/loader/git/updater.py swh/loader/git/tests/test_converters.py \ No newline at end of file diff --git a/swh.loader.git.egg-info/requires.txt b/swh.loader.git.egg-info/requires.txt index c6ecc1e..3a1c50a 100644 --- a/swh.loader.git.egg-info/requires.txt +++ b/swh.loader.git.egg-info/requires.txt @@ -1,8 +1,8 @@ click dulwich retrying swh.core>=0.0.7 swh.model>=0.0.3 swh.scheduler -swh.storage>=0.0.50 +swh.storage>=0.0.76 vcversioner diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py index c25213d..fec0c58 100644 --- a/swh/loader/git/base.py +++ b/swh/loader/git/base.py @@ -1,440 +1,442 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import os import traceback import uuid import psycopg2 import requests from retrying import retry from swh.core import config from swh.storage import get_storage def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): """Send `objects`, using the `sender`, in packets of `packet_size` objects (and of max `packet_size_bytes`). """ formatted_objects = [] count = 0 if not packet_size_bytes: packet_size_bytes = 0 for obj in objects: if not obj: continue formatted_objects.append(obj) if packet_size_bytes: count += obj['length'] if len(formatted_objects) >= packet_size or count > packet_size_bytes: sender(formatted_objects) formatted_objects = [] count = 0 if formatted_objects: sender(formatted_objects) def retry_loading(error): """Retry policy when we catch a recoverable error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader.git.BulkLoader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class BaseLoader(config.SWHConfig): """This base class is a pattern for loaders. The external calling convention is as such: - instantiate the class once (loads storage and the configuration) - for each origin, call load with the origin-specific arguments (for instance, an origin URL). load calls several methods that must be implemented in subclasses: - prepare(*args, **kwargs) prepares the loader for the new origin - get_origin gets the origin object associated to the current loader - fetch_data downloads the necessary data from the origin - get_{contents,directories,revisions,releases,occurrences} retrieve each kind of object from the origin - has_* checks whether there are some objects to load for that object type - get_fetch_history_result retrieves the data to insert in the fetch_history table once the load was successful - eventful returns whether the load was eventful or not """ CONFIG_BASE_FILENAME = None DEFAULT_CONFIG = { - 'storage_class': ('str', 'remote_storage'), - 'storage_args': ('list[str]', ['http://localhost:5000/']), - + 'storage': ('dict', { + 'cls': 'remote', + 'args': { + 'url': 'http://localhost:5000/' + }, + }), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'save_data': ('bool', False), 'save_data_path': ('str', ''), 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } ADDITIONAL_CONFIG = {} def __init__(self): self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) # Make sure the config is sane if self.config['save_data']: path = self.config['save_data_path'] os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) - self.storage = get_storage(self.config['storage_class'], - self.config['storage_args']) + self.storage = get_storage(**self.config['storage']) self.log = logging.getLogger('swh.loader.git.BulkLoader') def prepare(self, *args, **kwargs): """Prepare the data source to be loaded""" raise NotImplementedError def get_origin(self): """Get the origin that is currently being loaded""" raise NotImplementedError def fetch_data(self): """Fetch the data from the data source""" raise NotImplementedError def has_contents(self): """Checks whether we need to load contents""" return True def get_contents(self): """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self): """Checks whether we need to load directories""" return True def get_directories(self): """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self): """Checks whether we need to load revisions""" return True def get_revisions(self): """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self): """Checks whether we need to load releases""" return True def get_releases(self): """Get the releases that need to be loaded""" raise NotImplementedError def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" raise NotImplementedError def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" raise NotImplementedError def eventful(self): """Whether the load was eventful""" raise NotImplementedError def save_data(self): """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self): """The path to which we save the data""" if not hasattr(self, '__save_data_path'): origin_id = self.origin_id year = str(self.fetch_date.year) path = os.path.join( self.config['save_data_path'], "%04d" % (origin_id % 10000), "%08d" % origin_id, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database""" num_contents = len(content_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database""" num_directories = len(directory_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database""" num_revisions = len(revision_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database""" num_releases = len(release_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database""" num_occurrences = len(occurrence_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) def send_origin(self, origin): log_id = str(uuid.uuid4()) self.log.debug('Creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin_id = self.storage.origin_add_one(origin) self.log.debug('Done creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin_id def send_all_contents(self, contents): """Send all the contents to the database""" packet_size = self.config['content_packet_size'] packet_size_bytes = self.config['content_packet_size_bytes'] send_in_packets(contents, self.send_contents, packet_size, packet_size_bytes=packet_size_bytes) def send_all_directories(self, directories): """Send all the directories to the database""" packet_size = self.config['directory_packet_size'] send_in_packets(directories, self.send_directories, packet_size) def send_all_revisions(self, revisions): """Send all the revisions to the database""" packet_size = self.config['revision_packet_size'] send_in_packets(revisions, self.send_revisions, packet_size) def send_all_releases(self, releases): """Send all the releases to the database """ packet_size = self.config['release_packet_size'] send_in_packets(releases, self.send_releases, packet_size) def send_all_occurrences(self, occurrences): """Send all the occurrences to the database """ packet_size = self.config['occurrence_packet_size'] send_in_packets(occurrences, self.send_occurrences, packet_size) def open_fetch_history(self): return self.storage.fetch_history_start(self.origin_id) def close_fetch_history_success(self, fetch_history_id, result): data = { 'status': True, 'result': result, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } return self.storage.fetch_history_end(fetch_history_id, data) def load(self, *args, **kwargs): self.prepare(*args, **kwargs) origin = self.get_origin() self.origin_id = self.send_origin(origin) fetch_history_id = self.open_fetch_history() date_visit = datetime.datetime.now(tz=datetime.timezone.utc) origin_visit = self.storage.origin_visit_add( self.origin_id, date_visit) self.visit = origin_visit['visit'] try: self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) self.fetch_data() if self.config['save_data']: self.save_data() if self.config['send_contents'] and self.has_contents(): self.send_all_contents(self.get_contents()) if self.config['send_directories'] and self.has_directories(): self.send_all_directories(self.get_directories()) if self.config['send_revisions'] and self.has_revisions(): self.send_all_revisions(self.get_revisions()) if self.config['send_releases'] and self.has_releases(): self.send_all_releases(self.get_releases()) if self.config['send_occurrences'] and self.has_occurrences(): self.send_all_occurrences(self.get_occurrences()) self.close_fetch_history_success(fetch_history_id, self.get_fetch_history_result()) self.storage.origin_visit_update( self.origin_id, self.visit, status='full') except: self.close_fetch_history_failure(fetch_history_id) self.storage.origin_visit_update( self.origin_id, self.visit, status='partial') raise return self.eventful() diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index ed1e09e..f80a135 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,150 +1,150 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import dulwich.repo from swh.core import hashutil from . import base, converters class GitLoader(base.BaseLoader): """Load a git repository from a directory. """ - CONFIG_BASE_FILENAME = 'loader/git-loader.ini' + CONFIG_BASE_FILENAME = 'loader/git-loader' def prepare(self, origin_url, directory, fetch_date): self.origin_url = origin_url self.repo = dulwich.repo.Repo(directory) self.fetch_date = fetch_date def get_origin(self): """Get the origin that is currently being loaded""" return converters.origin_url_to_origin(self.origin_url) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def fetch_data(self): """Fetch the data from the data source""" type_to_ids = defaultdict(list) for oid in self.iter_objects(): type_name = self.repo[oid].type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_contents(self): """Get the contents that need to be loaded""" max_content_size = self.config['content_size_limit'] for oid in self.type_to_ids[b'blob']: yield converters.dulwich_blob_to_content( self.repo[oid], log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directories(self): """Get the directories that need to be loaded""" for oid in self.type_to_ids[b'tree']: yield converters.dulwich_tree_to_directory( self.repo[oid], log=self.log) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Get the revisions that need to be loaded""" for oid in self.type_to_ids[b'commit']: yield converters.dulwich_commit_to_revision( self.repo[oid], log=self.log) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_releases(self): """Get the releases that need to be loaded""" for oid in self.type_to_ids[b'tag']: yield converters.dulwich_tag_to_release( self.repo[oid], log=self.log) def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" repo = self.repo origin_id = self.origin_id visit = self.visit for ref, target in repo.refs.as_dict().items(): target_type_name = repo[target].type_name target_type = converters.DULWICH_TYPES[target_type_name] yield { 'branch': ref, 'origin': origin_id, 'target': hashutil.bytehex_to_hash(target), 'target_type': target_type, 'visit': visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.repo.refs.allkeys()), } def save_data(self): """We already have the data locally, no need to save it""" pass def eventful(self): """Whether the load was eventful""" return True if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = GitLoader() origin_url = sys.argv[1] directory = sys.argv[2] fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, fetch_date)) diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index b73d18d..634e23d 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,190 +1,204 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from collections import defaultdict from swh.core import hashutil, utils from .updater import BulkUpdater, SWHRepoRepresentation from . import converters class SWHRepoFullRepresentation(SWHRepoRepresentation): """Overridden representation of a swh repository to permit to read completely the remote repository. """ def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} self.heads = set() def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. In this particular context, we want to know everything. """ if not refs: return [] for target in refs.values(): self.heads.add(target) return self.filter_unwanted_refs(refs).values() def find_remote_ref_types_in_swh(self, remote_refs): """Find the known swh remote. In that particular context, we know nothing. """ return {} class DummyGraphWalker(object): """Dummy graph walker which claims that the client doesn’t have any objects. """ def ack(self, sha): pass def next(self): pass def __next__(self): pass class GitSha1RemoteReader(BulkUpdater): - """Disk git sha1 reader to dump only repo's content sha1 list. + """Read sha1 git from a remote repository and dump only repository's + content sha1 as list. """ CONFIG_BASE_FILENAME = 'loader/git-remote-reader' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), # don't want to store packs so empty 'next_task': ( 'dict', { 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', 'batch_size': 100, 'destination': 'azure' } ) } def __init__(self): super().__init__(SWHRepoFullRepresentation) self.next_task = self.config['next_task'] self.batch_size = self.next_task['batch_size'] - self.task_destination = self.next_task.get('queue') + self.task_destination = self.next_task['queue'] self.destination = self.next_task['destination'] def graph_walker(self): return DummyGraphWalker() def prepare(self, origin_url, base_url=None): """Only retrieve information about the origin, set everything else to empty. """ ori = converters.origin_url_to_origin(origin_url) self.origin = self.storage.origin_get(ori) self.origin_id = self.origin['id'] self.base_occurrences = [] self.base_origin_id = self.origin['id'] def list_pack(self, pack_data, pack_size): """Override list_pack to only keep contents' sha1. Returns: id_to_type (dict): keys are sha1, values are their associated type type_to_ids (dict): keys are types, values are list of associated ids (sha1 for blobs) """ id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type = obj.type_name if type != b'blob': # don't keep other types continue # compute the sha1 (obj.id is the sha1_git) data = obj.as_raw_string() hashes = hashutil.hashdata(data, {'sha1'}) oid = hashes['sha1'] id_to_type[oid] = type type_to_ids[type].add(oid) return id_to_type, type_to_ids def load(self, *args, **kwargs): """Override the loading part which simply reads the repository's contents' sha1. Returns: - If the configuration holds a destination queue, send those - sha1s as batch of sha1s to it for consumption. Otherwise, - returns the list of discovered sha1s. + Returns the list of discovered sha1s for that origin. """ try: self.prepare(*args, **kwargs) except: self.log.error('Unknown repository, skipping...') return [] self.fetch_data() - data = self.type_to_ids[b'blob'] + return self.type_to_ids[b'blob'] - if not self.task_destination: # to stdout - return data + +class GitSha1RemoteReaderAndSendToQueue(GitSha1RemoteReader): + """Read sha1 git from a remote repository and dump only repository's + content sha1 as list and send batch of those sha1s to a celery + queue for consumption. + + """ + def load(self, *args, **kwargs): + """Retrieve the list of sha1s for a particular origin and send those + sha1s as group of sha1s to a specific queue. + + """ + data = super().load(*args, **kwargs) from swh.scheduler.celery_backend.config import app try: # optional dependency from swh.storage.archiver import tasks # noqa except ImportError: pass from celery import group task_destination = app.tasks[self.task_destination] groups = [] for ids in utils.grouper(data, self.batch_size): sig_ids = task_destination.s(destination=self.destination, batch=list(ids)) groups.append(sig_ids) group(groups).delay() return data @click.command() @click.option('--origin-url', help='Origin\'s url') -def main(origin_url, source): +@click.option('--send/--nosend', default=False, help='Origin\'s url') +def main(origin_url, send): logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) + if send: + loader = GitSha1RemoteReaderAndSendToQueue() + ids = loader.load(origin_url) + print('%s sha1s were sent to queue' % len(ids)) + return + loader = GitSha1RemoteReader() - ids = loader.load(origin_url, source) + ids = loader.load(origin_url) if ids: - count = 0 for oid in ids: - print(oid) - count += 1 - print("sha1s: %s" % count) + print(hashutil.hash_to_hex(oid)) if __name__ == '__main__': main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 6373378..6b979de 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,53 +1,53 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dateutil.parser from swh.scheduler.task import Task from .loader import GitLoader from .updater import BulkUpdater -from .reader import GitSha1RemoteReader +from .reader import GitSha1RemoteReaderAndSendToQueue # TODO: rename to LoadRemoteGitRepository class UpdateGitRepository(Task): """Import a git repository from a remote location""" task_queue = 'swh_loader_git' def run(self, repo_url, base_url=None): """Import a git repository""" loader = BulkUpdater() loader.log = self.log return loader.load(repo_url, base_url) class LoadDiskGitRepository(Task): """Import a git repository from disk""" task_queue = 'swh_loader_git' def run(self, origin_url, directory, date): """Import a git repository, cloned in `directory` from `origin_url` at `date`.""" loader = GitLoader() loader.log = self.log return loader.load(origin_url, directory, dateutil.parser.parse(date)) class ReaderGitRepository(Task): task_queue = 'swh_reader_git' def run(self, repo_url, base_url=None): """Read a git repository from a remote location and send sha1 to archival. """ - loader = GitSha1RemoteReader() + loader = GitSha1RemoteReaderAndSendToQueue() loader.log = self.log return loader.load(repo_url) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index 20eca63..4bd6a8a 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,422 +1,422 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import BytesIO import logging import os import pickle import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse from swh.core import hashutil from . import base, converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: self.heads = set(self._cache_heads(origin_id, occurrences)) else: self.heads = set() def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" root_rev = hashutil.bytehex_to_hash(commit) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id, occurrences): """Return all the known head commits for `origin_id`""" if not occurrences: occurrences = self.storage.occurrence_get(origin_id) return self._decode_from_storage( occurrence['target'] for occurrence in occurrences ) def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret class BulkUpdater(base.BaseLoader): """A bulk loader for a git repository""" - CONFIG_BASE_FILENAME = 'loader/git-updater.ini' + CONFIG_BASE_FILENAME = 'loader/git-updater' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), } def __init__(self, repo_representation=SWHRepoRepresentation): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ super().__init__() self.repo_representation = repo_representation def fetch_pack_from_origin(self, origin_url, base_origin_id, base_occurrences, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation(self.storage, base_origin_id, base_occurrences) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data, pack_buffer=pack_buffer, limit=size_limit, origin_url=origin_url): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) remote_refs = client.fetch_pack(path.encode('ascii'), base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) base_occurrences = [] base_origin_id = origin_id = None db_origin = self.storage.origin_get(origin) if db_origin: base_origin_id = origin_id = db_origin['id'] if origin_id: base_occurrences = self.storage.occurrence_get(origin_id) if base_url and not base_occurrences: base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_id = base_origin['id'] base_occurrences = self.storage.occurrence_get(base_origin_id) self.base_occurrences = list(sorted(base_occurrences, key=lambda occ: occ['branch'])) self.base_origin_id = base_origin_id self.origin = origin def get_origin(self): return self.origin def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() fetch_info = self.fetch_pack_from_origin( self.origin['url'], self.base_origin_id, self.base_occurrences, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] origin_url = self.origin['url'] self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids def save_data(self): """Store a pack for archival""" write_size = 8192 pack_dir = self.get_save_data_path() pack_name = "%s.pack" % self.fetch_date.isoformat() refs_name = "%s.refs" % self.fetch_date.isoformat() with open(os.path.join(pack_dir, pack_name), 'xb') as f: while True: r = self.pack_buffer.read(write_size) if not r: break f.write(r) self.pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), 'xb') as f: pickle.dump(self.remote_refs, f) def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) def get_contents(self): """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content( raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): return bool(self.type_to_ids[b'tree']) def get_directories(self): """Format the trees as swh directories""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Format commits as swh revisions""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): return bool(self.type_to_ids[b'tag']) def get_releases(self): """Retrieve all the release objects from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def has_occurrences(self): return bool(self.remote_refs) def get_occurrences(self): origin_id = self.origin_id visit = self.visit ret = [] for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() ret_ref.update({ 'branch': ref, 'origin': origin_id, 'visit': visit, }) if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) ret.append(ret_ref) return ret def get_fetch_history_result(self): return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.remote_refs), } def eventful(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" current_occurrences = list(sorted( self.storage.occurrence_get(self.origin_id), key=lambda occ: occ['branch'], )) return self.base_occurrences != current_occurrences if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] print(bulkupdater.load(origin_url, base_url)) diff --git a/version.txt b/version.txt index e9a296b..e8e6905 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.23-0-gd6c853f \ No newline at end of file +v0.0.24-0-ga57dc1e \ No newline at end of file