diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -2,10 +2,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime import logging import iso8601 +from datetime import datetime + from urllib import parse from swh.lister.bitbucket.models import BitBucketModel @@ -23,6 +24,7 @@ MODEL = BitBucketModel LISTER_NAME = 'bitbucket' instance = 'bitbucket' + default_min_bound = datetime.utcfromtimestamp(0).isoformat() def __init__(self, api_baseurl, override_config=None, per_page=100): super().__init__( @@ -51,21 +53,6 @@ repos = response.json()['values'] return [self.get_model_from_repo(repo) for repo in repos] - def db_first_index(self): - """For the first time listing, there is no data in db, so fallback to the - bitbucket starting year. - - """ - return super().db_first_index() or '2008-01-01T00:00:00Z' - - def db_last_index(self): - """For the first time listing, there is no data in db, so fallback to the time - of the first run as max date. - - """ - return super().db_last_index() or datetime.datetime.now( - tz=datetime.timezone.utc).isoformat() - def request_uri(self, identifier): return super().request_uri(identifier or '1970-01-01') diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -30,12 +30,21 @@ @app.task(name=__name__ + '.FullBitBucketRelister', bind=True) def full_bitbucket_relister(self, split=None, **lister_args): + """Relist from the beginning of what's already been listed. + + It's not to be called for an initial listing. + + """ lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) + if not ranges: + self.log.info('Nothing to list') + return + random.shuffle(ranges) promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() - self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) + self.log.debug('%s OK (spawned %s subtasks)', (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -16,6 +16,8 @@ class IndexingLister(ListerBase): + default_min_bound = '' + """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known @@ -94,17 +96,18 @@ def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table - in equal sized chunks. This is used to describe min&max bounds for - parallelizing fetch tasks. + in equal sized chunks. This is used to describe min&max bounds for + parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition + Returns: a list of tuples (begin, end) of indexable value that - declare approximately equal-sized ranges of existing - repos - """ + declare approximately equal-sized ranges of existing + repos + """ n = max(self.db_num_entries(), 10) partition_size = min(partition_size, n) n_partitions = n // partition_size @@ -113,7 +116,8 @@ max_index = self.db_last_index() if not min_index or not max_index: - raise ValueError("Can't partition an empty range") + # Nothing to list + return [] if isinstance(min_index, str): def format_bound(bound): @@ -148,8 +152,6 @@ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] - else: - return None def db_last_index(self): """Look in the db for the largest indexable value @@ -160,8 +162,6 @@ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] - else: - return None def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. @@ -204,7 +204,7 @@ self.max_index = max_bound def ingest_indexes(): - index = min_bound or '' + index = min_bound or self.default_min_bound for i in count(1): response, injected_repos = self.ingest_data(index) if not response and not injected_repos: @@ -222,11 +222,12 @@ index) return index = next_index + logger.debug('Index: %s', index) yield i for i in ingest_indexes(): if (i % 20) == 0: - logger.info('flushing updates') + logger.info('flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session() diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -34,15 +34,12 @@ reset_at = int(response.headers['X-RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay - else: - return False, 0 + return False, 0 def get_next_target_from_response(self, response): if 'next' in response.links: next_url = response.links['next']['url'] return int(self.API_URL_INDEX_RE.match(next_url).group(1)) - else: - return None def transport_response_simplified(self, response): repos = response.json() diff --git a/swh/lister/github/tasks.py b/swh/lister/github/tasks.py --- a/swh/lister/github/tasks.py +++ b/swh/lister/github/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017-2018 the Software Heritage developers +# Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -30,8 +30,16 @@ @app.task(name=__name__ + '.FullGitHubRelister', bind=True) def full_github_relister(self, split=None, **lister_args): + """Relist from the beginning of what's already been listed. + + It's not to be called for an initial listing. + + """ lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) + if not ranges: + self.log.info('Nothing to list') + return random.shuffle(ranges) promise = group(range_github_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -38,6 +38,11 @@ @app.task(name=__name__ + '.FullGitLabRelister', bind=True) def full_gitlab_relister(self, **lister_args): + """Full lister + + This should be renamed as such. + + """ lister = new_lister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -31,6 +31,14 @@ super().__init__(api_baseurl=api_baseurl, override_config=override_config) + @property + def default_min_bound(self): + """Starting boundary when `min_bound` is not defined (db empty). This + is used within the fn:`run` call. + + """ + return self._bootstrap_repositories_listing() + def _build_query_params(self, params, api_token): """Build query params to include the forge's api token @@ -135,21 +143,6 @@ self.schedule_missing_tasks(models_list, injected) return self.max_index - def run(self, min_bound=None, max_bound=None): - """ - (Override) Run the lister on the specified Phabricator instance - - Args: - min_bound (int): Optional repository index to start the listing - after it - max_bound (int): Optional repository index to stop the listing - after it - """ - # initial call to the lister, we need to bootstrap it in that case - if min_bound is None: - min_bound = self._bootstrap_repositories_listing() - super().run(min_bound, max_bound) - def get_repo_url(attachments): """