diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index 9b4faa9..8e09e23 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,123 +1,126 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import time +from urllib3.util import parse_url from ..core.page_by_page_lister import PageByPageHttpLister from .models import GitLabModel class GitLabLister(PageByPageHttpLister): # Template path expecting an integer that represents the page id PATH_TEMPLATE = '/projects?page=%d&order_by=id' MODEL = GitLabModel LISTER_NAME = 'gitlab' - def __init__(self, api_baseurl=None, instance=None, + def __init__(self, api_baseurl, instance=None, override_config=None, sort='asc', per_page=20): super().__init__(api_baseurl=api_baseurl, override_config=override_config) + if instance is None: + instance = parse_url(api_baseurl).host self.instance = instance self.PATH_TEMPLATE = '%s&sort=%s' % (self.PATH_TEMPLATE, sort) if per_page != 20: self.PATH_TEMPLATE = '%s&per_page=%s' % ( self.PATH_TEMPLATE, per_page) @property def ADDITIONAL_CONFIG(self): """Override additional config as the 'credentials' structure change between the ancestor classes and this class. cf. request_params method below """ default_config = super().ADDITIONAL_CONFIG # 'credentials' is a dict of (instance, {username, password}) dict default_config['credentials'] = ('dict', {}) return default_config def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. For the gitlab lister, the 'credentials' entries is configured per instance. For example:: - credentials: - gitlab.com: - username: user0 password: - username: user1 password: - ... - other-gitlab-instance: ... """ params = { 'headers': self.request_headers() or {} } creds_lister = self.config['credentials'].get(self.instance) if creds_lister: auth = random.choice(creds_lister) if auth: params['auth'] = (auth['username'], auth['password']) return params def uid(self, repo): return '%s/%s' % (self.instance, repo['path_with_namespace']) def get_model_from_repo(self, repo): return { 'instance': self.instance, 'uid': self.uid(repo), 'name': repo['name'], 'full_name': repo['path_with_namespace'], 'html_url': repo['web_url'], 'origin_url': repo['http_url_to_repo'], 'origin_type': 'git', 'description': repo['description'], } def transport_quota_check(self, response): """Deal with rate limit if any. """ # not all gitlab instance have rate limit if 'RateLimit-Remaining' in response.headers: reqs_remaining = int(response.headers['RateLimit-Remaining']) if response.status_code == 403 and reqs_remaining == 0: reset_at = int(response.headers['RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay return False, 0 def _get_int(self, headers, key): _val = headers.get(key) if _val: return int(_val) def get_next_target_from_response(self, response): """Determine the next page identifier. """ return self._get_int(response.headers, 'x-next-page') def get_pages_information(self): """Determine pages information. """ response = self.transport_head(identifier=1) if not response.ok: raise ValueError( 'Problem during information fetch: %s' % response.status_code) h = response.headers return (self._get_int(h, 'x-total'), self._get_int(h, 'x-total-pages'), self._get_int(h, 'x-per-page')) def transport_response_simplified(self, response): repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index cd138d8..5f6a028 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,69 +1,69 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from celery import group from swh.scheduler.celery_backend.config import app from swh.scheduler.task import SWHTask from .. import utils from .lister import GitLabLister NBPAGES = 10 def new_lister(api_baseurl='https://gitlab.com/api/v4', - instance='gitlab', sort='asc', per_page=20): + instance=None, sort='asc', per_page=20): return GitLabLister( api_baseurl=api_baseurl, instance=instance, sort=sort, per_page=per_page) @app.task(name='swh.lister.gitlab.tasks.IncrementalGitLabLister', base=SWHTask, bind=True) def incremental_gitlab_lister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister_args['sort'] = 'desc' lister = new_lister(**lister_args) total_pages = lister.get_pages_information()[1] # stopping as soon as existing origins for that instance are detected lister.run(min_bound=1, max_bound=total_pages, check_existence=True) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.gitlab.tasks.RangeGitLabLister', base=SWHTask, bind=True) def range_gitlab_lister(self, start, end, **lister_args): self.log.debug('%s(start=%s, end=%d), lister_args=%s' % ( self.name, start, end, lister_args)) lister = new_lister(**lister_args) lister.run(min_bound=start, max_bound=end) self.log.debug('%s OK' % (self.name)) @app.task(name='swh.lister.gitlab.tasks.FullGitLabRelister', base=SWHTask, bind=True) def full_gitlab_relister(self, **lister_args): self.log.debug('%s, lister_args=%s' % ( self.name, lister_args)) lister = new_lister(**lister_args) _, total_pages, _ = lister.get_pages_information() ranges = list(utils.split_range(total_pages, NBPAGES)) random.shuffle(ranges) promise = group(range_gitlab_lister.s(minv, maxv, **lister_args) for minv, maxv in ranges)() self.log.debug('%s OK (spawned %s subtasks)' % (self.name, len(ranges))) promise.save() return promise.id @app.task(name='swh.lister.gitlab.tasks.ping', base=SWHTask, bind=True) def ping(self): self.log.debug(self.name) return 'OK' diff --git a/swh/lister/gitlab/tests/test_tasks.py b/swh/lister/gitlab/tests/test_tasks.py index 6438ca1..f8d0a81 100644 --- a/swh/lister/gitlab/tests/test_tasks.py +++ b/swh/lister/gitlab/tests/test_tasks.py @@ -1,102 +1,150 @@ from time import sleep from celery.result import GroupResult from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.gitlab.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.gitlab.tasks.GitLabLister') def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 10, None) res = swh_app.send_task( 'swh.lister.gitlab.tasks.IncrementalGitLabLister') assert res res.wait() assert res.successful() lister.assert_called_once_with( api_baseurl='https://gitlab.com/api/v4', - instance='gitlab', sort='desc', per_page=20) + instance=None, sort='desc', per_page=20) lister.db_last_index.assert_not_called() lister.get_pages_information.assert_called_once_with() lister.run.assert_called_once_with( min_bound=1, max_bound=10, check_existence=True) @patch('swh.lister.gitlab.tasks.GitLabLister') def test_range(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None res = swh_app.send_task( 'swh.lister.gitlab.tasks.RangeGitLabLister', kwargs=dict(start=12, end=42)) assert res res.wait() assert res.successful() lister.assert_called_once_with( api_baseurl='https://gitlab.com/api/v4', - instance='gitlab', sort='asc', per_page=20) + instance=None, sort='asc', per_page=20) lister.db_last_index.assert_not_called() lister.run.assert_called_once_with(min_bound=12, max_bound=42) @patch('swh.lister.gitlab.tasks.GitLabLister') def test_relister(lister, swh_app, celery_session_worker): # setup the mocked GitlabLister lister.return_value = lister lister.run.return_value = None lister.get_pages_information.return_value = (None, 85, None) lister.db_partition_indices.return_value = [ (i, i+9) for i in range(0, 80, 10)] + [(80, 85)] res = swh_app.send_task( 'swh.lister.gitlab.tasks.FullGitLabRelister') assert res res.wait() assert res.successful() # retrieve the GroupResult for this task and wait for all the subtasks # to complete promise_id = res.result assert promise_id promise = GroupResult.restore(promise_id, app=swh_app) for i in range(5): if promise.ready(): break sleep(1) lister.assert_called_with( api_baseurl='https://gitlab.com/api/v4', - instance='gitlab', sort='asc', per_page=20) + instance=None, sort='asc', per_page=20) + + # one by the FullGitlabRelister task + # + 9 for the RangeGitlabLister subtasks + assert lister.call_count == 10 + + lister.db_last_index.assert_not_called() + lister.db_partition_indices.assert_not_called() + lister.get_pages_information.assert_called_once_with() + + # lister.run should have been called once per partition interval + for i in range(8): + # XXX inconsistent behavior: max_bound is EXCLUDED here + assert (dict(min_bound=10*i, max_bound=10*i + 10),) \ + in lister.run.call_args_list + assert (dict(min_bound=80, max_bound=85),) \ + in lister.run.call_args_list + + +@patch('swh.lister.gitlab.tasks.GitLabLister') +def test_relister_instance(lister, swh_app, celery_session_worker): + # setup the mocked GitlabLister + lister.return_value = lister + lister.run.return_value = None + lister.get_pages_information.return_value = (None, 85, None) + lister.db_partition_indices.return_value = [ + (i, i+9) for i in range(0, 80, 10)] + [(80, 85)] + + res = swh_app.send_task( + 'swh.lister.gitlab.tasks.FullGitLabRelister', + kwargs=dict(api_baseurl='https://0xacab.org/api/v4')) + assert res + + res.wait() + assert res.successful() + + # retrieve the GroupResult for this task and wait for all the subtasks + # to complete + promise_id = res.result + assert promise_id + promise = GroupResult.restore(promise_id, app=swh_app) + for i in range(5): + if promise.ready(): + break + sleep(1) + + lister.assert_called_with( + api_baseurl='https://0xacab.org/api/v4', + instance=None, sort='asc', per_page=20) # one by the FullGitlabRelister task # + 9 for the RangeGitlabLister subtasks assert lister.call_count == 10 lister.db_last_index.assert_not_called() lister.db_partition_indices.assert_not_called() lister.get_pages_information.assert_called_once_with() # lister.run should have been called once per partition interval for i in range(8): # XXX inconsistent behavior: max_bound is EXCLUDED here assert (dict(min_bound=10*i, max_bound=10*i + 10),) \ in lister.run.call_args_list assert (dict(min_bound=80, max_bound=85),) \ in lister.run.call_args_list