diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -6,6 +6,7 @@ import logging import iso8601 +from datetime import timedelta from urllib import parse from swh.lister.bitbucket.models import BitBucketModel @@ -30,6 +31,16 @@ if per_page != DEFAULT_BITBUCKET_PAGE: self.PATH_TEMPLATE = '%s&pagelen=%s' % ( self.PATH_TEMPLATE, per_page) + # hack: to determine incremental lister's starting point + try: + self.last_index = super().db_last_index() + except Exception: + self.last_index = None + # variable to work around pagination caveat + self.last_seen_index = None + # to stay consistent with prior behavior (20 * 10 repositories then) + self.flush_packet_db = int( + (self.flush_packet_db * DEFAULT_BITBUCKET_PAGE) / per_page) def get_model_from_repo(self, repo): return { @@ -42,28 +53,62 @@ 'origin_type': repo['scm'], } + def _heuristic_next_index(self, date_index): + """Alter slightly the next index date by 1 day in the future. + + Args: + date_index (datetime): The current date as index + + Return: + datetime shifted to 1 day in the future + + """ + delta = timedelta(days=+1) + return date_index + delta + def get_next_target_from_response(self, response): + """This will read the next link from the api response. It so happens + that sometimes, the next link stays the same between consecutive api + calls... Thus stopping the listing... + + This tries to work around that by shifting the next index detected + to 1 day in the future (creating a hole). Experiments below one day + bear no success. + + Returns: + next date (isoformatted) to use as pagination index + + """ body = response.json() - if 'next' in body: - return parse.unquote(body['next'].split('after=')[1]) + next_ = body.get('next') + + if next_ is not None: + next_date = iso8601.parse_date( + parse.unquote(next_.split('after=')[1])) + if self.last_seen_index is not None and \ + self.last_seen_index >= next_date: + next_date = self._heuristic_next_index(next_date) + + self.last_seen_index = next_date + return next_date.isoformat() def transport_response_simplified(self, response): repos = response.json()['values'] return [self.get_model_from_repo(repo) for repo in repos] def db_first_index(self): - """For the first time listing, there is no data in db, so fallback to the - bitbucket starting year. + """For the first time listing, there is no data in db, so fallback to + the bitbucket starting year. """ return super().db_first_index() or '2008-01-01T00:00:00Z' def db_last_index(self): - """For the first time listing, there is no data in db, so fallback to the time - of the first run as max date. + """For the first time listing (full lister), there is no data in db, so + fallback to the time of the first run as max date. """ - return super().db_last_index() or datetime.datetime.now( + return self.last_index or datetime.datetime.now( tz=datetime.timezone.utc).isoformat() def request_uri(self, identifier): diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -19,7 +19,7 @@ @app.task(name=__name__ + '.IncrementalBitBucketLister') def incremental_bitbucket_lister(**lister_args): lister = new_lister(**lister_args) - lister.run(min_bound=lister.db_last_index(), max_bound=None) + lister.run(min_bound=lister.last_index, max_bound=None) @app.task(name=__name__ + '.RangeBitBucketLister') diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,4 +1,4 @@ -# Copyright (C) 2015-2017 the Software Heritage developers +# Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -16,6 +16,8 @@ class IndexingLister(ListerBase): + flush_packet_db = 20 + """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known @@ -148,8 +150,6 @@ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] - else: - return None def db_last_index(self): """Look in the db for the largest indexable value @@ -160,8 +160,6 @@ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] - else: - return None def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. @@ -222,11 +220,12 @@ index) return index = next_index + logger.debug('Index: %s', index) yield i for i in ingest_indexes(): - if (i % 20) == 0: - logger.info('flushing updates') + if (i % self.flush_packet_db) == 0: + logger.info('flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session()