diff --git a/resources/svn.ini b/resources/svn.ini index 9eefe96..d293d44 100644 --- a/resources/svn.ini +++ b/resources/svn.ini @@ -1,14 +1,23 @@ [main] storage_class = remote_storage storage_args = http://localhost:5000/ send_contents = True send_directories = True send_revisions = True send_releases = True send_occurrences = True +# nb of max contents to send for storage (if size threshold not reached before) content_packet_size = 10000 +# 100 Mib of content data (size threshold of data before sending for storage) +content_packet_block_size_bytes = 104857600 +# limit for swh content storage for one blob (beyond that limit, the +# content's data is not sent for storage) content_packet_size_bytes = 1073741824 +# packet of directories to send for storage directory_packet_size = 25000 -revision_packet_size = 100 +# packet of revisions to send for storage +revision_packet_size = 10000 +# packet of releases to send for storage release_packet_size = 100000 +# packet of occurrences to send for storage occurrence_packet_size = 100000 diff --git a/swh/loader/svn/libloader.py b/swh/loader/svn/libloader.py index c1b548c..1513db1 100644 --- a/swh/loader/svn/libloader.py +++ b/swh/loader/svn/libloader.py @@ -1,344 +1,357 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import psycopg2 import requests import traceback import uuid from retrying import retry from swh.core import config from swh.loader.dir import converters from swh.model.git import GitType from swh.storage import get_storage - -def send_in_packets(source_list, formatter, sender, packet_size, - packet_size_bytes=None, *args, **kwargs): - """Send objects from `source_list`, passed through `formatter` (with - extra args *args, **kwargs), using the `sender`, in packets of - `packet_size` objects (and of max `packet_size_bytes`). - - """ - formatted_objects = [] - count = 0 - if not packet_size_bytes: - packet_size_bytes = 0 - for obj in source_list: - formatted_object = formatter(obj, *args, **kwargs) - if formatted_object: - formatted_objects.append(formatted_object) - else: - continue - if packet_size_bytes: - count += formatted_object['length'] - if len(formatted_objects) >= packet_size or count > packet_size_bytes: - sender(formatted_objects) - formatted_objects = [] - count = 0 - - if formatted_objects: - sender(formatted_objects) +from swh.loader.svn.queue import QueuePerSize, QueuePerNbElements def retry_loading(error): """Retry policy when the database raises an integrity error""" exception_classes = [ # raised when two parallel insertions insert the same data. psycopg2.IntegrityError, # raised when uWSGI restarts and hungs up on the worker. requests.exceptions.ConnectionError, ] if not any(isinstance(error, exc) for exc in exception_classes): return False logger = logging.getLogger('swh.loader') error_name = error.__module__ + '.' + error.__class__.__name__ logger.warning('Retry loading a batch', exc_info=False, extra={ 'swh_type': 'storage_retry', 'swh_exception_type': error_name, 'swh_exception': traceback.format_exception( error.__class__, error, error.__traceback__, ), }) return True class SWHLoader(config.SWHConfig): """A svn loader. This will load the svn repository. """ - def __init__(self, config, revision_type, logging_class): + def __init__(self, config, revision_type, origin_id, logging_class): self.config = config + self.origin_id = origin_id self.storage = get_storage(config['storage_class'], config['storage_args']) self.revision_type = revision_type self.log = logging.getLogger(logging_class) + self.contents = QueuePerSize(key='sha1', + max_nb_elements=self.config[ + 'content_packet_size'], + max_size=self.config[ + 'content_packet_block_size_bytes']) + + self.directories = QueuePerNbElements(key='id', + max_nb_elements=self.config[ + 'directory_packet_size']) + + self.revisions = QueuePerNbElements(key='id', + max_nb_elements=self.config[ + 'revision_packet_size']) + + self.releases = QueuePerNbElements(key='id', + max_nb_elements=self.config[ + 'release_packet_size']) + + self.occurrences = QueuePerNbElements(key='id', + max_nb_elements=self.config[ + 'occurrence_packet_size']) + l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_contents(self, content_list): """Actually send properly formatted contents to the database""" num_contents = len(content_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) - self.storage.content_add(content_list) - self.log.debug("Done sending %d contents" % num_contents, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'content', - 'swh_num': num_contents, - 'swh_id': log_id, - }) + if num_contents > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) + self.storage.content_add(content_list) + self.log.debug("Done sending %d contents" % num_contents, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'content', + 'swh_num': num_contents, + 'swh_id': log_id, + }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_directories(self, directory_list): """Actually send properly formatted directories to the database""" num_directories = len(directory_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) - self.storage.directory_add(directory_list) - self.log.debug("Done sending %d directories" % num_directories, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'directory', - 'swh_num': num_directories, - 'swh_id': log_id, - }) + if num_directories > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) + self.storage.directory_add(directory_list) + self.log.debug("Done sending %d directories" % num_directories, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'directory', + 'swh_num': num_directories, + 'swh_id': log_id, + }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_revisions(self, revision_list): """Actually send properly formatted revisions to the database""" num_revisions = len(revision_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) - self.storage.revision_add(revision_list) - self.log.debug("Done sending %d revisions" % num_revisions, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'revision', - 'swh_num': num_revisions, - 'swh_id': log_id, - }) + if num_revisions > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) + self.storage.revision_add(revision_list) + self.log.debug("Done sending %d revisions" % num_revisions, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'revision', + 'swh_num': num_revisions, + 'swh_id': log_id, + }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_releases(self, release_list): """Actually send properly formatted releases to the database""" num_releases = len(release_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) - self.storage.release_add(release_list) - self.log.debug("Done sending %d releases" % num_releases, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'release', - 'swh_num': num_releases, - 'swh_id': log_id, - }) + if num_releases > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) + self.storage.release_add(release_list) + self.log.debug("Done sending %d releases" % num_releases, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'release', + 'swh_num': num_releases, + 'swh_id': log_id, + }) @retry(retry_on_exception=retry_loading, stop_max_attempt_number=3) def send_occurrences(self, occurrence_list): """Actually send properly formatted occurrences to the database""" num_occurrences = len(occurrence_list) - log_id = str(uuid.uuid4()) - self.log.debug("Sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_start', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) - self.storage.occurrence_add(occurrence_list) - self.log.debug("Done sending %d occurrences" % num_occurrences, - extra={ - 'swh_type': 'storage_send_end', - 'swh_content_type': 'occurrence', - 'swh_num': num_occurrences, - 'swh_id': log_id, - }) + if num_occurrences > 0: + log_id = str(uuid.uuid4()) + self.log.debug("Sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_start', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) + self.storage.occurrence_add(occurrence_list) + self.log.debug("Done sending %d occurrences" % num_occurrences, + extra={ + 'swh_type': 'storage_send_end', + 'swh_content_type': 'occurrence', + 'swh_num': num_occurrences, + 'swh_id': log_id, + }) def shallow_blob(self, obj): return { 'sha1': obj['sha1'], 'sha256': obj['sha256'], 'sha1_git': obj['sha1_git'], 'length': obj['length'] } def filter_missing_blobs(self, blobs): """Filter missing blob from swh. """ + max_content_size = self.config['content_packet_size_bytes'] blobs_per_sha1 = {} for blob in blobs: blobs_per_sha1[blob['sha1']] = blob for sha1 in self.storage.content_missing((self.shallow_blob(b) for b in blobs), key_hash='sha1'): - yield blobs_per_sha1[sha1] + yield converters.blob_to_content(blobs_per_sha1[sha1], + max_content_size=max_content_size, + origin_id=self.origin_id) - def bulk_send_blobs(self, blobs, origin_id): + def bulk_send_blobs(self, blobs): """Format blobs as swh contents and send them to the database""" - packet_size = self.config['content_packet_size'] - packet_size_bytes = self.config['content_packet_size_bytes'] - max_content_size = self.config['content_size_limit'] - - send_in_packets(self.filter_missing_blobs(blobs), - converters.blob_to_content, - self.send_contents, packet_size, - packet_size_bytes=packet_size_bytes, - log=self.log, - max_content_size=max_content_size, - origin_id=origin_id) + threshold_reached = self.contents.add( + self.filter_missing_blobs(blobs)) + if threshold_reached: + self.send_contents(self.contents.pop()) def shallow_tree(self, tree): return tree['sha1_git'] - def filter_missing_trees(self, trees): + def filter_missing_trees(self, trees, objects): """Filter missing tree from swh. """ trees_per_sha1 = {} for tree in trees: trees_per_sha1[tree['sha1_git']] = tree for sha in self.storage.directory_missing((self.shallow_tree(b) for b in trees)): - yield trees_per_sha1[sha] + yield converters.tree_to_directory(trees_per_sha1[sha], objects) def bulk_send_trees(self, objects, trees): """Format trees as swh directories and send them to the database""" - packet_size = self.config['directory_packet_size'] - - send_in_packets(self.filter_missing_trees(trees), - converters.tree_to_directory, - self.send_directories, packet_size, - objects=objects, - log=self.log) + threshold_reached = self.directories.add( + self.filter_missing_trees(trees, objects)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) def shallow_commit(self, commit): return commit['id'] def filter_missing_commits(self, commits): """Filter missing commit from swh. """ commits_per_sha1 = {} for commit in commits: commits_per_sha1[commit['id']] = commit for sha in self.storage.revision_missing((self.shallow_commit(b) for b in commits), type=self.revision_type): yield commits_per_sha1[sha] def bulk_send_commits(self, commits): """Format commits as swh revisions and send them to the database. """ - packet_size = self.config['revision_packet_size'] - - send_in_packets(self.filter_missing_commits(commits), - (lambda x, objects={}, log=None: x), - self.send_revisions, packet_size, - log=self.log) + threshold_reached = self.revisions.add( + self.filter_missing_commits(commits)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + self.send_revisions(self.revisions.pop()) def bulk_send_annotated_tags(self, tags): """Format annotated tags (pygit2.Tag objects) as swh releases and send them to the database. """ - packet_size = self.config['release_packet_size'] - - send_in_packets(tags, (lambda x, objects={}, log=None: x), - self.send_releases, packet_size, - log=self.log) + threshold_reached = self.releases.add(tags) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + self.send_revisions(self.revisions.pop()) + self.send_releases(self.releases.pop()) def bulk_send_refs(self, refs): """Format git references as swh occurrences and send them to the database. """ - packet_size = self.config['occurrence_packet_size'] - send_in_packets(refs, converters.ref_to_occurrence, - self.send_occurrences, packet_size) - - def maybe_load_contents(self, contents, origin_id): + threshold_reached = self.occurrences.add( + map(converters.ref_to_occurrence, refs)) + if threshold_reached: + self.send_contents(self.contents.pop()) + self.send_directories(self.directories.pop()) + self.send_revisions(self.revisions.pop()) + self.send_releases(self.releases.pop()) + self.send_occurrences(self.occurrences.pop()) + + def maybe_load_contents(self, contents): if self.config['send_contents']: - self.bulk_send_blobs(contents, origin_id) + self.bulk_send_blobs(contents) else: self.log.info('Not sending contents') def maybe_load_directories(self, trees, objects_per_path): if self.config['send_directories']: self.bulk_send_trees(objects_per_path, trees) else: self.log.info('Not sending directories') def maybe_load_revisions(self, revisions): if self.config['send_revisions']: self.bulk_send_commits(revisions) else: self.log.info('Not sending revisions') def maybe_load_releases(self, releases): if self.config['send_releases']: self.bulk_send_annotated_tags(releases) else: self.log.info('Not sending releases') def maybe_load_occurrences(self, occurrences): if self.config['send_occurrences']: self.bulk_send_refs(occurrences) else: self.log.info('Not sending occurrences') - def load(self, objects_per_type, objects_per_path, origin_id): - self.maybe_load_contents(objects_per_type[GitType.BLOB], - origin_id) + def load(self, objects_per_type, objects_per_path): + self.maybe_load_contents(objects_per_type[GitType.BLOB]) self.maybe_load_directories(objects_per_type[GitType.TREE], objects_per_path) self.maybe_load_revisions(objects_per_type[GitType.COMM]) self.maybe_load_releases(objects_per_type[GitType.RELE]) self.maybe_load_occurrences(objects_per_type[GitType.REFS]) + + def flush(self): + if self.config['send_contents']: + self.send_contents(self.contents.pop()) + if self.config['send_directories']: + self.send_directories(self.directories.pop()) + if self.config['send_revisions']: + self.send_revisions(self.revisions.pop()) + if self.config['send_occurrences']: + self.send_occurrences(self.occurrences.pop()) + if self.config['send_releases']: + self.send_releases(self.releases.pop()) diff --git a/swh/loader/svn/loader.py b/swh/loader/svn/loader.py index 597c9f9..23476db 100644 --- a/swh/loader/svn/loader.py +++ b/swh/loader/svn/loader.py @@ -1,173 +1,177 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.core import utils -from swh.model import git +from swh.model import git, hashutil from swh.model.git import GitType from swh.loader.svn import libloader, svn, converters class SvnLoader(libloader.SWHLoader): """Svn loader to load one svn repository. """ - def __init__(self, config): + def __init__(self, config, origin_id): super().__init__(config, revision_type='svn', + origin_id=origin_id, logging_class='swh.loader.svn.SvnLoader') def check_history_not_altered(self, svnrepo, revision_start, swh_rev): """Given a svn repository, check if the history was not tampered with. """ revision_id = swh_rev['id'] parents = swh_rev['parents'] hash_data_per_revs = svnrepo.swh_hash_data_per_revision(revision_start, revision_start) rev, _, commit, objects_per_path = list(hash_data_per_revs)[0] dir_id = objects_per_path[git.ROOT_TREE_KEY][0]['sha1_git'] swh_revision = converters.build_swh_revision(svnrepo.uuid, commit, rev, dir_id, parents) swh_revision_id = git.compute_revision_sha1_git(swh_revision) return swh_revision_id == revision_id def process_revisions(self, svnrepo, revision_start, revision_end, revision_parents): """Process revisions from revision_start to revision_end and send to swh for storage. At each svn revision, checkout the repository, compute the tree hash and blobs and send for swh storage to store. Then computes and yields the swh revision. Yields: swh revision """ for rev, nextrev, commit, objects_per_path in svnrepo.swh_hash_data_per_revision( # noqa revision_start, revision_end): objects_per_type = { GitType.BLOB: [], GitType.TREE: [], GitType.COMM: [], GitType.RELE: [], GitType.REFS: [], } # compute the fs tree's checksums dir_id = objects_per_path[git.ROOT_TREE_KEY][0]['sha1_git'] swh_revision = converters.build_swh_revision(svnrepo.uuid, commit, rev, dir_id, revision_parents[rev]) swh_revision['id'] = git.compute_revision_sha1_git(swh_revision) - # self.log.debug('svnrev: %s, swhrev: %s, nextsvnrev: %s' % ( - # rev, swh_revision['id'], nextrev)) + self.log.debug('rev: %s, swhrev: %s' % ( + rev, hashutil.hash_to_hex(swh_revision['id']))) if nextrev: revision_parents[nextrev] = [swh_revision['id']] # send blobs for tree_path in objects_per_path: objs = objects_per_path[tree_path] for obj in objs: objects_per_type[obj['type']].append(obj) - self.load(objects_per_type, objects_per_path, svnrepo.origin_id) + self.load(objects_per_type, objects_per_path) yield swh_revision def process(self, svn_url, origin, destination_path): """Load a svn repository in swh. Checkout the svn repository locally in destination_path. Args: - svn_url: svn repository url to import - origin: Dictionary origin - id: origin's id - url: url origin we fetched - type: type of the origin Returns: Dictionary with the following keys: - status: mandatory, the status result as a boolean - stderr: optional when status is True, mandatory otherwise """ svnrepo = svn.SvnRepo(svn_url, origin['id'], self.storage, destination_path) try: swh_rev = svnrepo.swh_previous_revision() if swh_rev: extra_headers = dict(swh_rev['metadata']['extra_headers']) revision_start = extra_headers['svn_revision'] revision_parents = { revision_start: swh_rev['parents'] } else: revision_start = 1 revision_parents = { revision_start: [] } svnrepo.fork(revision_start) self.log.debug('svn co %s@%s' % (svn_url, revision_start)) if swh_rev and not self.check_history_not_altered(svnrepo, revision_start, swh_rev): msg = 'History of svn %s@%s history modified. Skipping...' % ( svn_url, revision_start) self.log.warn(msg) return {'status': False, 'stderr': msg} revision_end = svnrepo.head_revision() self.log.debug('[revision_start-revision_end]: [%s-%s]' % ( revision_start, revision_end)) if revision_start == revision_end and revision_start is not 1: self.log.info('%s@%s already injected.' % (svn_url, revision_end)) return {'status': True} self.log.info('Repo %s ready to be processed.' % svnrepo) # process and store revision to swh (sent by by blocks of # 'revision_packet_size') for revisions in utils.grouper( self.process_revisions(svnrepo, revision_start, revision_end, revision_parents), self.config['revision_packet_size']): revs = list(revisions) self.maybe_load_revisions(revs) # create occurrence pointing to the latest revision (the last one) swh_revision = revs[-1] occ = converters.build_swh_occurrence(swh_revision['id'], origin['id'], datetime.datetime.utcnow()) self.log.debug('occ: %s' % occ) self.maybe_load_occurrences([occ]) + + # flush eventual remaining data + self.flush() finally: svnrepo.clean_fs() return {'status': True} diff --git a/swh/loader/svn/queue.py b/swh/loader/svn/queue.py new file mode 100644 index 0000000..93bb255 --- /dev/null +++ b/swh/loader/svn/queue.py @@ -0,0 +1,80 @@ +# Copyright (C) 2015-2016 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class QueuePerSize(): + """Data structure to add elements and count the current size of the queue. + + """ + def __init__(self, max_nb_elements, max_size, key): + self.reset() + self.max_nb_elements = max_nb_elements + self.max_size = max_size + self.key = key + self.keys = set() + + def _add_element(self, e): + k = e[self.key] + if k not in self.keys: + self.keys.add(k) + self.elements.append(e) + self.size += e['length'] + self.count += 1 + + def add(self, elements): + for e in elements: + self._add_element(e) + return self.size >= self.max_size or \ + self.count >= self.max_nb_elements + + def size(self): + return self.size + + def pop(self): + elements = self.elements + self.reset() + return elements + + def reset(self): + self.elements = [] + self.keys = set() + self.size = 0 + self.count = 0 + + +class QueuePerNbElements(): + """Data structure to hold elements and the actual counts on it. + + """ + def __init__(self, max_nb_elements, key): + self.reset() + self.max_nb_elements = max_nb_elements + self.key = key + self.keys = set() + + def _add_element(self, e): + k = e[self.key] + if k not in self.keys: + self.keys.add(k) + self.elements.append(e) + self.count += 1 + + def add(self, elements): + for e in elements: + self._add_element(e) + return self.count >= self.max_nb_elements + + def size(self): + return self.count + + def pop(self): + elements = self.elements + self.reset() + return elements + + def reset(self): + self.elements = [] + self.keys = set() + self.count = 0 diff --git a/swh/loader/svn/tasks.py b/swh/loader/svn/tasks.py index caea022..92b9165 100644 --- a/swh/loader/svn/tasks.py +++ b/swh/loader/svn/tasks.py @@ -1,97 +1,100 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.core.config import load_named_config from swh.scheduler.task import Task from swh.storage import get_storage from swh.model.git import GitType from swh.loader.svn.loader import SvnLoader DEFAULT_CONFIG = { 'storage_class': ('str', 'remote_storage'), 'storage_args': ('list[str]', ['http://localhost:5000/']), 'send_contents': ('bool', True), 'send_directories': ('bool', True), 'send_revisions': ('bool', True), 'send_releases': ('bool', True), 'send_occurrences': ('bool', True), 'content_packet_size': ('int', 10000), - 'content_packet_size_bytes': ('int', 1073741824), + 'content_packet_block_size_bytes': ('int', 100 * 1024 * 1024), + 'content_packet_size_bytes': ('int', 1 * 1024 * 1024 * 1024), 'directory_packet_size': ('int', 25000), 'revision_packet_size': ('int', 100), 'release_packet_size': ('int', 100000), 'occurrence_packet_size': ('int', 100000), } class LoadSvnRepositoryTsk(Task): """Import a svn repository to Software Heritage """ task_queue = 'swh_loader_svn' @property def config(self): if not hasattr(self, '__config'): self.__config = load_named_config( 'loader/svn.ini', DEFAULT_CONFIG) l = logging.getLogger('requests.packages.urllib3.connectionpool') l.setLevel(logging.WARN) return self.__config def open_fetch_history(self, storage, origin_id): return storage.fetch_history_start(origin_id) def close_fetch_history(self, storage, fetch_history_id, res): result = None if 'objects' in res: result = { 'contents': len(res['objects'].get(GitType.BLOB, [])), 'directories': len(res['objects'].get(GitType.TREE, [])), 'revisions': len(res['objects'].get(GitType.COMM, [])), 'releases': len(res['objects'].get(GitType.RELE, [])), 'occurrences': len(res['objects'].get(GitType.REFS, [])), } data = { 'status': res['status'], 'result': result, 'stderr': res.get('stderr') } return storage.fetch_history_end(fetch_history_id, data) def run(self, svn_url, local_path): """Import a svn repository. Args: cf. swh.loader.svn.SvnLoader.process docstring """ config = self.config storage = get_storage(config['storage_class'], config['storage_args']) origin = {'type': 'svn', 'url': svn_url} origin['id'] = storage.origin_add_one(origin) fetch_history_id = self.open_fetch_history(storage, origin['id']) # try: - result = SvnLoader(config).process(svn_url, origin, local_path) + result = SvnLoader(config, origin['id']).process(svn_url, + origin, + local_path) # except: # e_info = sys.exc_info() # self.log.error('Problem during svn load for repo %s - %s' % ( # svn_url, e_info[1])) # result = {'status': False, 'stderr': 'reason:%s\ntrace:%s' % ( # e_info[1], # ''.join(traceback.format_tb(e_info[2])))} self.close_fetch_history(storage, fetch_history_id, result)