diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -6,6 +6,7 @@ import logging import iso8601 +from datetime import timedelta from urllib import parse from swh.lister.bitbucket.models import BitBucketModel @@ -14,7 +15,6 @@ logger = logging.getLogger(__name__) - DEFAULT_BITBUCKET_PAGE = 10 BITBUCKET_STARTING_TIME = '2008-01-01T00:00:00Z' @@ -28,9 +28,20 @@ def __init__(self, api_baseurl, override_config=None, per_page=100): super().__init__( api_baseurl=api_baseurl, override_config=override_config) - if per_page != DEFAULT_BITBUCKET_PAGE: - self.PATH_TEMPLATE = '%s&pagelen=%s' % ( - self.PATH_TEMPLATE, per_page) + self.per_page = per_page + + def request_params(self, identifier): + """Deal properly with extra api call query parameters. + + This installs the `pagelen` query parameter in charge of quantity of + repositories to return per api call. + + """ + params = super().request_params(identifier) + query_params = params.get('params', {}) # future query parameters here + query_params.update({'pagelen': self.per_page}) + params['params'] = query_params + return params def get_model_from_repo(self, repo): return { @@ -43,25 +54,61 @@ 'origin_type': repo['scm'], } + def heuristic_next_index(self, index, next_index): + """Bitbucket api calls sometimes returns the same next link as the + current one. This allows to try and overcome this by shifting + slightly the next_date to 1 day in the future, preventing to stop + the listing altogether. + + Experiments below the time delta of 1 day in the future bear no + result. + + Args: + index (str): Index (isoformat date string) of current iteration + next_index (str): Next Index (isoformat date string) of future + iteration + + Returns: + Next index shifted by 1 day in the future as a isoformatted + date string. + + """ + delta = timedelta(days=+1) + d = iso8601.parse_date(next_index) + return (d + delta).isoformat() + def get_next_target_from_response(self, response): + """This will read the next link from the api response. It so happens + that sometimes, the next link stays the same between consecutive api + calls... Thus stopping the listing... + + This tries to work around that by shifting the next index detected + to 1 day in the future (creating a hole). Experiments below one day + bear no success. + + Returns: + next date (isoformatted) to use as pagination index + + """ body = response.json() - if 'next' in body: - return parse.unquote(body['next'].split('after=')[1]) + next_ = body.get('next') + if next_ is not None: + return parse.unquote(next_.split('after=')[1]) def transport_response_simplified(self, response): repos = response.json()['values'] return [self.get_model_from_repo(repo) for repo in repos] def db_first_index(self): - """For the first time listing, there is no data in db, so fallback to the - bitbucket starting year. + """For the first time listing, there is no data in db, so fallback to + the bitbucket starting year. """ return super().db_first_index() or BITBUCKET_STARTING_TIME def db_last_index(self): - """For the first time listing, there is no data in db, so fallback to the time - of the first run as max date. + """For the first time listing, there is no data in db, so fallback to + the time of the first run as max date. """ return super().db_last_index() or datetime.datetime.now( diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -183,6 +183,33 @@ for repo in deleted_repos: repo.task_id = None + def heuristic_next_index(self, index, next_index): + """Compute heuristically a next index. This function is called when a + specific event occurs. + + That specific event is an iteration loop where the next index + computed is the same as the current one. It's often ok and in that + case, the listing is done. + + But sometimes an api (e.g. bitbucket) can return the same next link. In + effect, stopping the listing when it's not actually complete. + + This allows, per lister, to implement a specific behavior for that + specific event. + + By default, this returns the `next_index` as is. + + Args: + index (Option[int,str]): Current iteration index + next_index (Option[int,str]): Next index to use for the next + iteration + + Returns: + next_index to use for the next iteration (by default, same as input) + + """ + return next_index + def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class @@ -213,17 +240,26 @@ self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition - if next_index is None or next_index == index: - logger.info('stopping after index %s, no next link found' % + if next_index is None: + logger.info('No next link found, stopping after index %s', index) return + # potential termination condition + if next_index == index: + logger.warn( + 'Next link found same as current one %s', + next_index) + next_index = self.heuristic_next_index( + index, next_index) + if index == next_index: # stopping + return index = next_index logger.debug('Index: %s', index) yield i for i in ingest_indexes(): - if (i % 20) == 0: - logger.info('flushing updates at index %s', i) + if (i % self.flush_packet_db) == 0: + logger.debug('Flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session()