diff --git a/swh/lister/bitbucket/lister.py b/swh/lister/bitbucket/lister.py index 2c00c68..c2e0292 100644 --- a/swh/lister/bitbucket/lister.py +++ b/swh/lister/bitbucket/lister.py @@ -1,86 +1,95 @@ # Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import iso8601 from urllib import parse from swh.lister.bitbucket.models import BitBucketModel from swh.lister.core.indexing_lister import IndexingHttpLister logger = logging.getLogger(__name__) +DEFAULT_BITBUCKET_PAGE = 10 + + class BitBucketLister(IndexingHttpLister): PATH_TEMPLATE = '/repositories?after=%s' MODEL = BitBucketModel LISTER_NAME = 'bitbucket' instance = 'bitbucket' + def __init__(self, api_baseurl, override_config=None, per_page=100): + super().__init__( + api_baseurl=api_baseurl, override_config=override_config) + if per_page != DEFAULT_BITBUCKET_PAGE: + self.PATH_TEMPLATE = '%s&pagelen=%s' % ( + self.PATH_TEMPLATE, per_page) + def get_model_from_repo(self, repo): return { 'uid': repo['uuid'], 'indexable': repo['created_on'], 'name': repo['name'], 'full_name': repo['full_name'], 'html_url': repo['links']['html']['href'], 'origin_url': repo['links']['clone'][0]['href'], 'origin_type': repo['scm'], } def get_next_target_from_response(self, response): body = response.json() if 'next' in body: return parse.unquote(body['next'].split('after=')[1]) def transport_response_simplified(self, response): repos = response.json()['values'] return [self.get_model_from_repo(repo) for repo in repos] def db_first_index(self): """For the first time listing, there is no data in db, so fallback to the bitbucket starting year. """ return super().db_first_index() or '2008-01-01T00:00:00Z' def db_last_index(self): """For the first time listing, there is no data in db, so fallback to the time of the first run as max date. """ return super().db_last_index() or datetime.datetime.now( tz=datetime.timezone.utc).isoformat() def request_uri(self, identifier): return super().request_uri(identifier or '1970-01-01') def is_within_bounds(self, inner, lower=None, upper=None): # values are expected to be str dates try: inner = iso8601.parse_date(inner) if lower: lower = iso8601.parse_date(lower) if upper: upper = iso8601.parse_date(upper) if lower is None and upper is None: return True elif lower is None: ret = inner <= upper elif upper is None: ret = inner >= lower else: ret = lower <= inner <= upper except Exception as e: - logger.error(str(e) + ': %s, %s, %s' % - (('inner=%s%s' % (type(inner), inner)), - ('lower=%s%s' % (type(lower), lower)), - ('upper=%s%s' % (type(upper), upper))) - ) + logger.error(str(e) + ': %s, %s, %s', + ('inner=%s%s' % (type(inner), inner)), + ('lower=%s%s' % (type(lower), lower)), + ('upper=%s%s' % (type(upper), upper))) raise return ret diff --git a/swh/lister/bitbucket/tasks.py b/swh/lister/bitbucket/tasks.py index 9985b48..5477b94 100644 --- a/swh/lister/bitbucket/tasks.py +++ b/swh/lister/bitbucket/tasks.py @@ -1,48 +1,48 @@ -# Copyright (C) 2017-2018 the Software Heritage developers +# Copyright (C) 2017-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from .lister import BitBucketLister GROUP_SPLIT = 10000 -def new_lister(api_baseurl='https://api.bitbucket.org/2.0'): - return BitBucketLister(api_baseurl=api_baseurl) +def new_lister(api_baseurl='https://api.bitbucket.org/2.0', per_page=100): + return BitBucketLister(api_baseurl=api_baseurl, per_page=per_page) @app.task(name=__name__ + '.IncrementalBitBucketLister') def incremental_bitbucket_lister(**lister_args): lister = new_lister(**lister_args) lister.run(min_bound=lister.db_last_index(), max_bound=None) @app.task(name=__name__ + '.RangeBitBucketLister') def range_bitbucket_lister(start, end, **lister_args): lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) @app.task(name=__name__ + '.FullBitBucketRelister', bind=True) def full_bitbucket_relister(self, split=None, **lister_args): lister = new_lister(**lister_args) ranges = lister.db_partition_indices(split or GROUP_SPLIT) random.shuffle(ranges) promise = group(range_bitbucket_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) try: promise.save() # so that we can restore the GroupResult in tests except (NotImplementedError, AttributeError): self.log.info('Unable to call save_group with current result backend.') return promise.id @app.task(name=__name__ + '.ping') def ping(): return 'OK' diff --git a/swh/lister/bitbucket/tests/test_tasks.py b/swh/lister/bitbucket/tests/test_tasks.py index 7674627..1e02b6f 100644 --- a/swh/lister/bitbucket/tests/test_tasks.py +++ b/swh/lister/bitbucket/tests/test_tasks.py @@ -1,89 +1,92 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.bitbucket.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.bitbucket.tasks.BitBucketLister') def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.db_last_index.return_value = 42 lister.run.return_value = None res = swh_app.send_task( 'swh.lister.bitbucket.tasks.IncrementalBitBucketLister') assert res res.wait() assert res.successful() - lister.assert_called_once_with(api_baseurl='https://api.bitbucket.org/2.0') + lister.assert_called_once_with( + api_baseurl='https://api.bitbucket.org/2.0', per_page=100) lister.db_last_index.assert_called_once_with() lister.run.assert_called_once_with(min_bound=42, max_bound=None) @patch('swh.lister.bitbucket.tasks.BitBucketLister') def test_range(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( 'swh.lister.bitbucket.tasks.RangeBitBucketLister', kwargs=dict(start=12, end=42)) assert res res.wait() assert res.successful() - lister.assert_called_once_with(api_baseurl='https://api.bitbucket.org/2.0') + lister.assert_called_once_with( + api_baseurl='https://api.bitbucket.org/2.0', per_page=100) lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) @patch('swh.lister.bitbucket.tasks.BitBucketLister') def test_relister(lister, swh_app, celery_session_worker): # setup the mocked BitbucketLister lister.return_value = lister lister.run.return_value = None lister.db_partition_indices.return_value = [ (i, i+9) for i in range(0, 50, 10)] res = swh_app.send_task( 'swh.lister.bitbucket.tasks.FullBitBucketRelister') assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) - lister.assert_called_with(api_baseurl='https://api.bitbucket.org/2.0') + lister.assert_called_with( + api_baseurl='https://api.bitbucket.org/2.0', per_page=100) # one by the FullBitbucketRelister task # + 5 for the RangeBitbucketLister subtasks assert lister.call_count == 6 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_called_once_with(10000) # lister.run should have been called once per partition interval for i in range(5): assert (dict(min_bound=10*i, max_bound=10*i + 9),) \ in lister.run.call_args_list