diff --git a/PKG-INFO b/PKG-INFO index a95130a..29c07a8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.22 +Version: 0.0.23 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/bin/send-url.py b/bin/send-url.py new file mode 100755 index 0000000..2818e65 --- /dev/null +++ b/bin/send-url.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import click +import logging +import json + +from swh.scheduler.celery_backend.config import app +from swh.loader.git import tasks # noqa + + +@click.command() +@click.option('--json-file', help="Json file from github api") +@click.option('--queue', default="swh.loader.git.tasks.ReaderGitRepository", + help="Destination queue") +@click.option('--limit', default=None, help='limit on urls to send') +def main(json_file, queue, limit): + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(process)d %(message)s' + ) + if limit: + limit = int(limit) + + repos = json.loads(open(json_file).read()) + + task_destination = app.tasks[queue] + + count = 0 + for repo in repos['items']: + url = repo['html_url'] + count += 1 + task_destination.delay(url) + logging.info(url) + if limit and count >= limit: + return + + +if __name__ == '__main__': + main() diff --git a/swh.loader.git.egg-info/PKG-INFO b/swh.loader.git.egg-info/PKG-INFO index a95130a..29c07a8 100644 --- a/swh.loader.git.egg-info/PKG-INFO +++ b/swh.loader.git.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.loader.git -Version: 0.0.22 +Version: 0.0.23 Summary: Software Heritage git loader Home-page: https://forge.softwareheritage.org/diffusion/DCORE/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.loader.git.egg-info/SOURCES.txt b/swh.loader.git.egg-info/SOURCES.txt index 6209d26..e83a1fa 100644 --- a/swh.loader.git.egg-info/SOURCES.txt +++ b/swh.loader.git.egg-info/SOURCES.txt @@ -1,38 +1,39 @@ .gitignore .gitmodules AUTHORS LICENSE MANIFEST.in Makefile README requirements.txt setup.py version.txt bin/dir-git-repo-meta.sh +bin/send-url.py debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/attic/api-backend-protocol.txt docs/attic/git-loading-design.txt resources/local-loader-git.ini resources/remote-loader-git.ini resources/updater.ini resources/test/back.ini resources/test/db-manager.ini scratch/analyse-profile.py swh.loader.git.egg-info/PKG-INFO swh.loader.git.egg-info/SOURCES.txt swh.loader.git.egg-info/dependency_links.txt swh.loader.git.egg-info/requires.txt swh.loader.git.egg-info/top_level.txt swh/loader/git/__init__.py swh/loader/git/base.py swh/loader/git/converters.py swh/loader/git/loader.py swh/loader/git/reader.py swh/loader/git/tasks.py swh/loader/git/updater.py swh/loader/git/tests/test_converters.py \ No newline at end of file diff --git a/swh/loader/git/base.py b/swh/loader/git/base.py index bee4412..c25213d 100644 --- a/swh/loader/git/base.py +++ b/swh/loader/git/base.py @@ -1,402 +1,440 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging +import os import traceback import uuid import psycopg2 import requests from retrying import retry from swh.core import config from swh.storage import get_storage def send_in_packets(objects, sender, packet_size, packet_size_bytes=None): """Send `objects`, using the `sender`, in packets of `packet_size` objects (and of max `packet_size_bytes`). """ formatted_objects = [] count = 0 if not packet_size_bytes: packet_size_bytes = 0 for obj in objects: if not obj: continue formatted_objects.append(obj) if packet_size_bytes: count += obj['length'] if len(formatted_objects) >= packet_size or count > packet_size_bytes: sender(formatted_objects) formatted_objects = [] count = 0 if formatted_objects: sender(formatted_objects) def retry_loading(error): """Retry policy when we catch a recoverable error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader.git.BulkLoader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class BaseLoader(config.SWHConfig): """This base class is a pattern for loaders. The external calling convention is as such: - instantiate the class once (loads storage and the configuration) - for each origin, call load with the origin-specific arguments (for instance, an origin URL). load calls several methods that must be implemented in subclasses: - prepare(*args, **kwargs) prepares the loader for the new origin - get_origin gets the origin object associated to the current loader - fetch_data downloads the necessary data from the origin - get_{contents,directories,revisions,releases,occurrences} retrieve each kind of object from the origin - has_* checks whether there are some objects to load for that object type - get_fetch_history_result retrieves the data to insert in the fetch_history table once the load was successful - eventful returns whether the load was eventful or not """ CONFIG_BASE_FILENAME = None DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5000/']), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), + 'save_data': ('bool', False), + 'save_data_path': ('str', ''), + 'content_packet_size': ('int', 10000), 'content_packet_size_bytes': ('int', 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100000), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } ADDITIONAL_CONFIG = {} def __init__(self): self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) + # Make sure the config is sane + if self.config['save_data']: + path = self.config['save_data_path'] + os.stat(path) + if not os.access(path, os.R_OK | os.W_OK): + raise PermissionError("Permission denied: %r" % path) + self.storage = get_storage(self.config['storage_class'], self.config['storage_args']) + self.log = logging.getLogger('swh.loader.git.BulkLoader') def prepare(self, *args, **kwargs): """Prepare the data source to be loaded""" raise NotImplementedError def get_origin(self): """Get the origin that is currently being loaded""" raise NotImplementedError def fetch_data(self): """Fetch the data from the data source""" raise NotImplementedError def has_contents(self): """Checks whether we need to load contents""" return True def get_contents(self): """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self): """Checks whether we need to load directories""" return True def get_directories(self): """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self): """Checks whether we need to load revisions""" return True def get_revisions(self): """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self): """Checks whether we need to load releases""" return True def get_releases(self): """Get the releases that need to be loaded""" raise NotImplementedError def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" raise NotImplementedError def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" raise NotImplementedError def eventful(self): """Whether the load was eventful""" raise NotImplementedError + def save_data(self): + """Save the data associated to the current load""" + raise NotImplementedError + + def get_save_data_path(self): + """The path to which we save the data""" + if not hasattr(self, '__save_data_path'): + origin_id = self.origin_id + year = str(self.fetch_date.year) + + path = os.path.join( + self.config['save_data_path'], + "%04d" % (origin_id % 10000), + "%08d" % origin_id, + year, + ) + + os.makedirs(path, exist_ok=True) + self.__save_data_path = path + + return self.__save_data_path + @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database""" num_contents = len(content_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) self.storage.content_add(content_list) self.log.debug("Done sending %d contents" % num_contents, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'content', 'swh_num': num_contents, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database""" num_directories = len(directory_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) self.storage.directory_add(directory_list) self.log.debug("Done sending %d directories" % num_directories, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'directory', 'swh_num': num_directories, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database""" num_revisions = len(revision_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) self.storage.revision_add(revision_list) self.log.debug("Done sending %d revisions" % num_revisions, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'revision', 'swh_num': num_revisions, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database""" num_releases = len(release_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) self.storage.release_add(release_list) self.log.debug("Done sending %d releases" % num_releases, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'release', 'swh_num': num_releases, 'swh_id': log_id, }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database""" num_occurrences = len(occurrence_list) log_id = str(uuid.uuid4()) self.log.debug("Sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) self.storage.occurrence_add(occurrence_list) self.log.debug("Done sending %d occurrences" % num_occurrences, extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'occurrence', 'swh_num': num_occurrences, 'swh_id': log_id, }) def send_origin(self, origin): log_id = str(uuid.uuid4()) self.log.debug('Creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_start', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) origin_id = self.storage.origin_add_one(origin) self.log.debug('Done creating %s origin for %s' % (origin['type'], origin['url']), extra={ 'swh_type': 'storage_send_end', 'swh_content_type': 'origin', 'swh_num': 1, 'swh_id': log_id }) return origin_id def send_all_contents(self, contents): """Send all the contents to the database""" packet_size = self.config['content_packet_size'] packet_size_bytes = self.config['content_packet_size_bytes'] send_in_packets(contents, self.send_contents, packet_size, packet_size_bytes=packet_size_bytes) def send_all_directories(self, directories): """Send all the directories to the database""" packet_size = self.config['directory_packet_size'] send_in_packets(directories, self.send_directories, packet_size) def send_all_revisions(self, revisions): """Send all the revisions to the database""" packet_size = self.config['revision_packet_size'] send_in_packets(revisions, self.send_revisions, packet_size) def send_all_releases(self, releases): """Send all the releases to the database """ packet_size = self.config['release_packet_size'] send_in_packets(releases, self.send_releases, packet_size) def send_all_occurrences(self, occurrences): """Send all the occurrences to the database """ packet_size = self.config['occurrence_packet_size'] send_in_packets(occurrences, self.send_occurrences, packet_size) def open_fetch_history(self): return self.storage.fetch_history_start(self.origin_id) def close_fetch_history_success(self, fetch_history_id, result): data = { 'status': True, 'result': result, } return self.storage.fetch_history_end(fetch_history_id, data) def close_fetch_history_failure(self, fetch_history_id): import traceback data = { 'status': False, 'stderr': traceback.format_exc(), } return self.storage.fetch_history_end(fetch_history_id, data) def load(self, *args, **kwargs): self.prepare(*args, **kwargs) origin = self.get_origin() self.origin_id = self.send_origin(origin) fetch_history_id = self.open_fetch_history() date_visit = datetime.datetime.now(tz=datetime.timezone.utc) origin_visit = self.storage.origin_visit_add( self.origin_id, date_visit) self.visit = origin_visit['visit'] try: + self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) self.fetch_data() + if self.config['save_data']: + self.save_data() + if self.config['send_contents'] and self.has_contents(): self.send_all_contents(self.get_contents()) if self.config['send_directories'] and self.has_directories(): self.send_all_directories(self.get_directories()) if self.config['send_revisions'] and self.has_revisions(): self.send_all_revisions(self.get_revisions()) if self.config['send_releases'] and self.has_releases(): self.send_all_releases(self.get_releases()) if self.config['send_occurrences'] and self.has_occurrences(): self.send_all_occurrences(self.get_occurrences()) self.close_fetch_history_success(fetch_history_id, self.get_fetch_history_result()) self.storage.origin_visit_update( self.origin_id, self.visit, status='full') except: self.close_fetch_history_failure(fetch_history_id) self.storage.origin_visit_update( self.origin_id, self.visit, status='partial') raise return self.eventful() diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index 71e1282..ed1e09e 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,145 +1,150 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import dulwich.repo from swh.core import hashutil from . import base, converters class GitLoader(base.BaseLoader): """Load a git repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/git-loader.ini' def prepare(self, origin_url, directory, fetch_date): self.origin_url = origin_url self.repo = dulwich.repo.Repo(directory) self.fetch_date = fetch_date def get_origin(self): """Get the origin that is currently being loaded""" return converters.origin_url_to_origin(self.origin_url) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def fetch_data(self): """Fetch the data from the data source""" type_to_ids = defaultdict(list) for oid in self.iter_objects(): type_name = self.repo[oid].type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_contents(self): """Get the contents that need to be loaded""" max_content_size = self.config['content_size_limit'] for oid in self.type_to_ids[b'blob']: yield converters.dulwich_blob_to_content( self.repo[oid], log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directories(self): """Get the directories that need to be loaded""" for oid in self.type_to_ids[b'tree']: yield converters.dulwich_tree_to_directory( self.repo[oid], log=self.log) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Get the revisions that need to be loaded""" for oid in self.type_to_ids[b'commit']: yield converters.dulwich_commit_to_revision( self.repo[oid], log=self.log) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_releases(self): """Get the releases that need to be loaded""" for oid in self.type_to_ids[b'tag']: yield converters.dulwich_tag_to_release( self.repo[oid], log=self.log) def has_occurrences(self): """Checks whether we need to load occurrences""" return True def get_occurrences(self): """Get the occurrences that need to be loaded""" repo = self.repo origin_id = self.origin_id visit = self.visit for ref, target in repo.refs.as_dict().items(): target_type_name = repo[target].type_name target_type = converters.DULWICH_TYPES[target_type_name] yield { 'branch': ref, 'origin': origin_id, 'target': hashutil.bytehex_to_hash(target), 'target_type': target_type, 'visit': visit, } def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.repo.refs.allkeys()), } + def save_data(self): + """We already have the data locally, no need to save it""" + pass + def eventful(self): """Whether the load was eventful""" return True + if __name__ == '__main__': import logging import sys logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) loader = GitLoader() origin_url = sys.argv[1] directory = sys.argv[2] fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) print(loader.load(origin_url, directory, fetch_date)) diff --git a/swh/loader/git/reader.py b/swh/loader/git/reader.py index 634e23d..b73d18d 100644 --- a/swh/loader/git/reader.py +++ b/swh/loader/git/reader.py @@ -1,204 +1,190 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click import logging from collections import defaultdict from swh.core import hashutil, utils from .updater import BulkUpdater, SWHRepoRepresentation from . import converters class SWHRepoFullRepresentation(SWHRepoRepresentation): """Overridden representation of a swh repository to permit to read completely the remote repository. """ def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} self.heads = set() def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. In this particular context, we want to know everything. """ if not refs: return [] for target in refs.values(): self.heads.add(target) return self.filter_unwanted_refs(refs).values() def find_remote_ref_types_in_swh(self, remote_refs): """Find the known swh remote. In that particular context, we know nothing. """ return {} class DummyGraphWalker(object): """Dummy graph walker which claims that the client doesn’t have any objects. """ def ack(self, sha): pass def next(self): pass def __next__(self): pass class GitSha1RemoteReader(BulkUpdater): - """Read sha1 git from a remote repository and dump only repository's - content sha1 as list. + """Disk git sha1 reader to dump only repo's content sha1 list. """ CONFIG_BASE_FILENAME = 'loader/git-remote-reader' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), 'pack_storage_base': ('str', ''), # don't want to store packs so empty 'next_task': ( 'dict', { 'queue': 'swh.storage.archiver.tasks.SWHArchiverToBackendTask', 'batch_size': 100, 'destination': 'azure' } ) } def __init__(self): super().__init__(SWHRepoFullRepresentation) self.next_task = self.config['next_task'] self.batch_size = self.next_task['batch_size'] - self.task_destination = self.next_task['queue'] + self.task_destination = self.next_task.get('queue') self.destination = self.next_task['destination'] def graph_walker(self): return DummyGraphWalker() def prepare(self, origin_url, base_url=None): """Only retrieve information about the origin, set everything else to empty. """ ori = converters.origin_url_to_origin(origin_url) self.origin = self.storage.origin_get(ori) self.origin_id = self.origin['id'] self.base_occurrences = [] self.base_origin_id = self.origin['id'] def list_pack(self, pack_data, pack_size): """Override list_pack to only keep contents' sha1. Returns: id_to_type (dict): keys are sha1, values are their associated type type_to_ids (dict): keys are types, values are list of associated ids (sha1 for blobs) """ id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type = obj.type_name if type != b'blob': # don't keep other types continue # compute the sha1 (obj.id is the sha1_git) data = obj.as_raw_string() hashes = hashutil.hashdata(data, {'sha1'}) oid = hashes['sha1'] id_to_type[oid] = type type_to_ids[type].add(oid) return id_to_type, type_to_ids def load(self, *args, **kwargs): """Override the loading part which simply reads the repository's contents' sha1. Returns: - Returns the list of discovered sha1s for that origin. + If the configuration holds a destination queue, send those + sha1s as batch of sha1s to it for consumption. Otherwise, + returns the list of discovered sha1s. """ try: self.prepare(*args, **kwargs) except: self.log.error('Unknown repository, skipping...') return [] self.fetch_data() - return self.type_to_ids[b'blob'] + data = self.type_to_ids[b'blob'] - -class GitSha1RemoteReaderAndSendToQueue(GitSha1RemoteReader): - """Read sha1 git from a remote repository and dump only repository's - content sha1 as list and send batch of those sha1s to a celery - queue for consumption. - - """ - def load(self, *args, **kwargs): - """Retrieve the list of sha1s for a particular origin and send those - sha1s as group of sha1s to a specific queue. - - """ - data = super().load(*args, **kwargs) + if not self.task_destination: # to stdout + return data from swh.scheduler.celery_backend.config import app try: # optional dependency from swh.storage.archiver import tasks # noqa except ImportError: pass from celery import group task_destination = app.tasks[self.task_destination] groups = [] for ids in utils.grouper(data, self.batch_size): sig_ids = task_destination.s(destination=self.destination, batch=list(ids)) groups.append(sig_ids) group(groups).delay() return data @click.command() @click.option('--origin-url', help='Origin\'s url') -@click.option('--send/--nosend', default=False, help='Origin\'s url') -def main(origin_url, send): +def main(origin_url, source): logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) - if send: - loader = GitSha1RemoteReaderAndSendToQueue() - ids = loader.load(origin_url) - print('%s sha1s were sent to queue' % len(ids)) - return - loader = GitSha1RemoteReader() - ids = loader.load(origin_url) + ids = loader.load(origin_url, source) if ids: + count = 0 for oid in ids: - print(hashutil.hash_to_hex(oid)) + print(oid) + count += 1 + print("sha1s: %s" % count) if __name__ == '__main__': main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 6b979de..6373378 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,53 +1,53 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dateutil.parser from swh.scheduler.task import Task from .loader import GitLoader from .updater import BulkUpdater -from .reader import GitSha1RemoteReaderAndSendToQueue +from .reader import GitSha1RemoteReader # TODO: rename to LoadRemoteGitRepository class UpdateGitRepository(Task): """Import a git repository from a remote location""" task_queue = 'swh_loader_git' def run(self, repo_url, base_url=None): """Import a git repository""" loader = BulkUpdater() loader.log = self.log return loader.load(repo_url, base_url) class LoadDiskGitRepository(Task): """Import a git repository from disk""" task_queue = 'swh_loader_git' def run(self, origin_url, directory, date): """Import a git repository, cloned in `directory` from `origin_url` at `date`.""" loader = GitLoader() loader.log = self.log return loader.load(origin_url, directory, dateutil.parser.parse(date)) class ReaderGitRepository(Task): task_queue = 'swh_reader_git' def run(self, repo_url, base_url=None): """Read a git repository from a remote location and send sha1 to archival. """ - loader = GitSha1RemoteReaderAndSendToQueue() + loader = GitSha1RemoteReader() loader.log = self.log return loader.load(repo_url) diff --git a/swh/loader/git/updater.py b/swh/loader/git/updater.py index 12d9ce8..20eca63 100644 --- a/swh/loader/git/updater.py +++ b/swh/loader/git/updater.py @@ -1,437 +1,422 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime from io import BytesIO import logging import os import pickle import sys from collections import defaultdict import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from urllib.parse import urlparse from swh.core import hashutil from . import base, converters class SWHRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, origin_id, occurrences=None): self.storage = storage self._parents_cache = {} self._type_cache = {} if origin_id: self.heads = set(self._cache_heads(origin_id, occurrences)) else: self.heads = set() def _fill_parents_cache(self, commit): """When querying for a commit's parents, we fill the cache to a depth of 100 commits.""" root_rev = hashutil.bytehex_to_hash(commit) for rev, parents in self.storage.revision_shortlog([root_rev], 100): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] def _cache_heads(self, origin_id, occurrences): """Return all the known head commits for `origin_id`""" if not occurrences: occurrences = self.storage.occurrence_get(origin_id) return self._decode_from_storage( occurrence['target'] for occurrence in occurrences ) def get_parents(self, commit): """get the parent commits for `commit`""" if commit not in self._parents_cache: self._fill_parents_cache(commit) return self._parents_cache.get(commit, []) def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): return self.storage.object_find_by_sha1_git( self._encode_for_storage(objects)) def find_remote_ref_types_in_swh(self, remote_refs): """Parse the remote refs information and list the objects that exist in Software Heritage. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret class BulkUpdater(base.BaseLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git-updater.ini' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), - 'pack_storage_base': ('str', ''), } def __init__(self, repo_representation=SWHRepoRepresentation): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ super().__init__() self.repo_representation = repo_representation - def store_pack_and_refs(self, pack_buffer, remote_refs): - """Store a pack for archival""" - - write_size = 8192 - - origin_id = "%08d" % self.origin_id - year = str(self.fetch_date.year) - - pack_dir = os.path.join( - self.config['pack_storage_base'], - origin_id, - year, - ) - pack_name = "%s.pack" % self.fetch_date.isoformat() - refs_name = "%s.refs" % self.fetch_date.isoformat() - - os.makedirs(pack_dir, exist_ok=True) - with open(os.path.join(pack_dir, pack_name), 'xb') as f: - while True: - r = pack_buffer.read(write_size) - if not r: - break - f.write(r) - - pack_buffer.seek(0) - - with open(os.path.join(pack_dir, refs_name), 'xb') as f: - pickle.dump(remote_refs, f) - def fetch_pack_from_origin(self, origin_url, base_origin_id, base_occurrences, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation(self.storage, base_origin_id, base_occurrences) parsed_uri = urlparse(origin_url) path = parsed_uri.path if not path.endswith('.git'): path += '.git' client = dulwich.client.TCPGitClient(parsed_uri.netloc, thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data, pack_buffer=pack_buffer, limit=size_limit, origin_url=origin_url): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, limit, cur_size, would_write)) pack_buffer.write(data) remote_refs = client.fetch_pack(path.encode('ascii'), base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) - if self.config['pack_storage_base']: - self.store_pack_and_refs(pack_buffer, remote_refs) - return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare(self, origin_url, base_url=None): origin = converters.origin_url_to_origin(origin_url) base_origin = converters.origin_url_to_origin(base_url) base_occurrences = [] base_origin_id = origin_id = None db_origin = self.storage.origin_get(origin) if db_origin: base_origin_id = origin_id = db_origin['id'] if origin_id: base_occurrences = self.storage.occurrence_get(origin_id) if base_url and not base_occurrences: base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_id = base_origin['id'] base_occurrences = self.storage.occurrence_get(base_origin_id) self.base_occurrences = list(sorted(base_occurrences, key=lambda occ: occ['branch'])) self.base_origin_id = base_origin_id self.origin = origin def get_origin(self): return self.origin def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() - self.fetch_date = datetime.datetime.now(tz=datetime.timezone.utc) - fetch_info = self.fetch_pack_from_origin( self.origin['url'], self.base_origin_id, self.base_occurrences, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] origin_url = self.origin['url'] self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids + def save_data(self): + """Store a pack for archival""" + + write_size = 8192 + pack_dir = self.get_save_data_path() + + pack_name = "%s.pack" % self.fetch_date.isoformat() + refs_name = "%s.refs" % self.fetch_date.isoformat() + + with open(os.path.join(pack_dir, pack_name), 'xb') as f: + while True: + r = self.pack_buffer.read(write_size) + if not r: + break + f.write(r) + + self.pack_buffer.seek(0) + + with open(os.path.join(pack_dir, refs_name), 'xb') as f: + pickle.dump(self.remote_refs, f) + def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) def get_contents(self): """Format the blobs from the git repository as swh contents""" max_content_size = self.config['content_size_limit'] for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content( raw_obj, log=self.log, max_content_size=max_content_size, origin_id=self.origin_id) def has_directories(self): return bool(self.type_to_ids[b'tree']) def get_directories(self): """Format the trees as swh directories""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self): return bool(self.type_to_ids[b'commit']) def get_revisions(self): """Format commits as swh revisions""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self): return bool(self.type_to_ids[b'tag']) def get_releases(self): """Retrieve all the release objects from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def has_occurrences(self): return bool(self.remote_refs) def get_occurrences(self): origin_id = self.origin_id visit = self.visit ret = [] for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() ret_ref.update({ 'branch': ref, 'origin': origin_id, 'visit': visit, }) if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) ret.append(ret_ref) return ret def get_fetch_history_result(self): return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), 'occurrences': len(self.remote_refs), } def eventful(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" current_occurrences = list(sorted( self.storage.occurrence_get(self.origin_id), key=lambda occ: occ['branch'], )) return self.base_occurrences != current_occurrences if __name__ == '__main__': logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) bulkupdater = BulkUpdater() origin_url = sys.argv[1] base_url = origin_url if len(sys.argv) > 2: base_url = sys.argv[2] print(bulkupdater.load(origin_url, base_url)) diff --git a/version.txt b/version.txt index 9ae8190..e9a296b 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.22-0-gae4606d \ No newline at end of file +v0.0.23-0-gd6c853f \ No newline at end of file