diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -4,12 +4,15 @@ import abc import logging +from itertools import count from sqlalchemy import func from .lister_transports import SWHListerHttpTransport from .lister_base import SWHListerBase +logger = logging.getLogger(__name__) + class SWHIndexingLister(SWHListerBase): """Lister* intermediate class for any service that follows the pattern: @@ -165,39 +168,33 @@ Returns: nothing """ - index = min_bound or '' - loop_count = 0 self.min_index = min_bound self.max_index = max_bound - while self.is_within_bounds(index, self.min_index, self.max_index): - logging.info('listing repos starting at %s' % index) - - response, injected_repos = self.ingest_data(index) - if not response and not injected_repos: - logging.info('No response from api server, stopping') - break - - next_index = self.get_next_target_from_response(response) - - # Determine if any repos were deleted, and disable their tasks. - - keep_these = [k for k in injected_repos.keys()] - self.disable_deleted_repo_tasks(index, next_index, keep_these) - - # termination condition - - if (next_index is None) or (next_index == index): - logging.info('stopping after index %s, no next link found' % - index) - break - else: + def ingest_indexes(): + index = min_bound or '' + for i in count(1): + response, injected_repos = self.ingest_data(index) + if not response and not injected_repos: + logger.info('No response from api server, stopping') + return + + next_index = self.get_next_target_from_response(response) + # Determine if any repos were deleted, and disable their tasks. + keep_these = list(injected_repos.keys()) + self.disable_deleted_repo_tasks(index, next_index, keep_these) + + # termination condition + if next_index is None or next_index == index: + logger.info('stopping after index %s, no next link found' % + index) + return index = next_index + yield i - loop_count += 1 - if loop_count == 20: - logging.info('flushing updates') - loop_count = 0 + for i in ingest_indexes(): + if (i % 20) == 0: + logger.info('flushing updates') self.db_session.commit() self.db_session = self.mk_session()