diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index 1b4d5b2..d681743 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,134 +1,153 @@ # Copyright (C) 2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import requests import xmltodict try: from swh.lister._version import __version__ except ImportError: __version__ = 'devel' from .abstractattribute import AbstractAttribute from .lister_base import FetchError class SWHListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with SWHListerBase or a subclass of it. """ PATH_TEMPLATE = AbstractAttribute('string containing a python string' ' format pattern that produces the API' ' endpoint path for listing stored' ' repositories when given an index.' ' eg. "/repositories?after=%s".' 'To be implemented in the API-specific' ' class inheriting this.') EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version } def request_uri(self, identifier): """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier return self.api_baseurl + path def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} creds = self.config['credentials'] auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements SWHListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, api_baseurl=None): if not api_baseurl: raise NameError('HTTP Lister Transport requires api_baseurl.') self.api_baseurl = api_baseurl # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ - def transport_request(self, identifier): - """Implements SWHListerBase.transport_request for HTTP using Requests. + def _transport_action(self, identifier, method='get'): + """Permit to ask information to the api prior to actually executing + query. + """ path = self.request_uri(identifier) params = self.request_params(identifier) try: - response = self.session.get(path, **params) + if method == 'head': + response = self.session.head(path, **params) + else: + response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response + def transport_head(self, identifier): + """Retrieve head information on api. + + """ + return self._transport_action(identifier, method='head') + + def transport_request(self, identifier): + """Implements SWHListerBase.transport_request for HTTP using Requests. + + Retrieve get information on api. + + """ + return self._transport_action(identifier) + def transport_response_to_string(self, response): """Implements SWHListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except: # not xml s += pformat(response.text) return s diff --git a/swh/lister/core/paging_lister.py b/swh/lister/core/paging_lister.py index f3c5b5f..34457b2 100644 --- a/swh/lister/core/paging_lister.py +++ b/swh/lister/core/paging_lister.py @@ -1,117 +1,138 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from .lister_transports import SWHListerHttpTransport from .lister_base import SWHListerBase class SWHPagingLister(SWHListerBase): """Lister* intermediate class for any service that follows the simple pagination page pattern. - Client sends a request to list repositories starting from a given page identifier. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories (per page) starting from a given index. And, if available, some indication of the next page index for fetching the remaining repository data. See :class:`swh.lister.core.lister_base.SWHListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class. Then provide the required overrides in addition to any unmet implementation/override requirements of this class's base (see parent class and member docstrings for details). Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint page given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use the headers links to provide the next page. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass + @abc.abstractmethod + def get_pages_information(self): + """Find the total number of pages. + + Implementation of this method depends on the server API spec + and the shape of the network response object returned by the + transport_request method. + + For example, some api can use dedicated headers: + - x-total-pages to provide the total number of pages + - x-total to provide the total number of repositories + - x-per-page to provide the number of elements per page + + Returns: + tuple (total number of repositories, total number of + pages, per_page) + + """ + pass + # You probably don't need to override anything below this line. def run(self, min_index=None, max_index=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring. Continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_index. Args: min_index (indexable type): optional index to start from max_index (indexable type): optional index to stop at Returns: nothing """ index = min_index or '' loop_count = 0 + self.min_index = min_index self.max_index = max_index while self.is_within_bounds(index, self.min_index, self.max_index): logging.info('listing repos starting at %s' % index) response, injected_repos = self.ingest_data(index) next_index = self.get_next_target_from_response(response) # termination condition if (next_index is None) or (next_index == index): logging.info('stopping after index %s, no next link found' % index) break else: index = next_index loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class SWHPagingHttpLister(SWHListerHttpTransport, SWHPagingLister): """Convenience class for ensuring right lookup and init order when combining SWHPagingLister and SWHListerHttpTransport. """ def __init__(self, lister_name=None, api_baseurl=None, override_config=None): SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl) SWHPagingLister.__init__(self, lister_name=lister_name, override_config=override_config) diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index fde974f..78f7736 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,113 +1,130 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re import time from ..core.paging_lister import SWHPagingHttpLister from .models import GitLabModel class GitLabLister(SWHPagingHttpLister): # Template path expecting an integer that represents the page id PATH_TEMPLATE = '/projects?page=%d&order_by=id&sort=asc&simple=true' API_URL_INDEX_RE = re.compile(r'^.*/projects.*page=(\d+).*') MODEL = GitLabModel @property def CONFIG_BASE_FILENAME(self): """One gitlab lister for all instances. We discriminate between the origin on a per instance basis in the table. """ return 'lister-gitlab' @property def ADDITIONAL_CONFIG(self): """Override additional config as the 'credentials' structure change between the ancestor classes and this class. cf. request_params method below """ return { 'lister_db_url': ('str', 'postgresql:///lister-gitlab'), 'credentials': # credentials is a dict ('dict', {}), 'cache_responses': ('bool', False), 'cache_dir': ('str', '~/.cache/swh/lister/%s' % self.lister_name), } def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. For the gitlab lister, the 'credentials' entries is configured per instance. For example: - credentials: - gitlab.com: - username: user0 password: - username: user1 password: - ... - other-gitlab-instance: ... """ params = { 'headers': self.request_headers() or {} } # Retrieve the credentials per instance creds = self.config['credentials'] if creds: creds_lister = creds[self.lister_name] auth = random.choice(creds_lister) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def get_model_from_repo(self, repo): return { 'instance': self.lister_name, 'uid': repo['id'], 'indexable': repo['id'], 'name': repo['name'], 'full_name': repo['path_with_namespace'], 'html_url': repo['web_url'], 'origin_url': repo['http_url_to_repo'], 'origin_type': 'git', 'description': repo['description'], } def transport_quota_check(self, response): """Deal with rate limit if any. """ # not all gitlab instance have rate limit if 'RateLimit-Remaining' in response.headers: reqs_remaining = int(response.headers['RateLimit-Remaining']) if response.status_code == 403 and reqs_remaining == 0: reset_at = int(response.headers['RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay return False, 0 def get_next_target_from_response(self, response): - """Deal with pagination + """Determine the next page identifier. """ if 'next' in response.links: next_url = response.links['next']['url'] return int(self.API_URL_INDEX_RE.match(next_url).group(1)) return None + def get_pages_information(self): + """Determine some pages information. + + """ + response = self.transport_head(identifier=1) + h = response.headers + total = h.get('x-total', h.get('X-Total')) + total_pages = h.get('x-total-pages', h.get('X-Total-Pages')) + per_page = h.get('x-per-page', h.get('X-Per-Page')) + if total is not None: + total = int(total) + if total_pages is not None: + total_pages = int(total_pages) + if per_page is not None: + per_page = int(per_page) + return total, total_pages, per_page + def transport_response_simplified(self, response): repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] diff --git a/swh/lister/gitlab/tasks.py b/swh/lister/gitlab/tasks.py index 8046225..63bc179 100644 --- a/swh/lister/gitlab/tasks.py +++ b/swh/lister/gitlab/tasks.py @@ -1,24 +1,44 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.lister.core.tasks import ListerTaskBase, RangeListerTask +import random +from celery import group +from ..core.tasks import ListerTaskBase, RangeListerTask from .lister import GitLabLister -class GitLabDotComListerTask(ListerTaskBase): +class GitLabListerTask(ListerTaskBase): def new_lister(self, lister_name='gitlab.com', api_baseurl='https://gitlab.com/api/v4'): return GitLabLister( lister_name=lister_name, api_baseurl=api_baseurl) -class RangeGitLabLister(GitLabDotComListerTask, RangeListerTask): +class RangeGitLabLister(GitLabListerTask, RangeListerTask): """GitLab lister working on specified range (start, end) arguments. """ task_queue = 'swh_lister_gitlab_refresh' +class FullGitLabRelister(GitLabListerTask): + task_queue = 'swh_lister_gitlab_refresh' + + def run_task(self, *args, **kwargs): + lister = self.new_lister(*args, **kwargs) + total, _, per_page = lister.get_pages_information() + + ranges = [] + prev_index = None + for index in range(0, total, per_page): + if index is not None and prev_index is not None: + ranges.append((prev_index, index)) + prev_index = index + + random.shuffle(ranges) + range_task = RangeGitLabLister() + group(range_task.s(minv, maxv, *args, **kwargs) + for minv, maxv in ranges)()