diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index 75bc6ad..a2202bd 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,207 +1,222 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import logging import requests import xmltodict try: from swh.lister._version import __version__ except ImportError: __version__ = 'devel' from .abstractattribute import AbstractAttribute from .lister_base import FetchError logger = logging.getLogger(__name__) class SWHListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with SWHListerBase or a subclass of it. """ PATH_TEMPLATE = AbstractAttribute('string containing a python string' ' format pattern that produces the API' ' endpoint path for listing stored' ' repositories when given an index.' ' eg. "/repositories?after=%s".' 'To be implemented in the API-specific' ' class inheriting this.') EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version } + def request_instance_credentials(self): + """Returns dictionary of any credentials configuration needed by the + forge instance to list. + + Returns: + dict of credentials per instance lister or {} if none. + + """ + all_creds = self.config.get('credentials') + if not all_creds: + return {} + lister_creds = all_creds.get(self.LISTER_NAME, {}) + creds = lister_creds.get(self.instance, {}) + return creds + def request_uri(self, identifier): """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier return self.api_baseurl + path def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. This uses credentials if any are provided. The 'credentials' configuration is expected to be a dict of multiple levels. The first level is the lister's name, the second is the lister's instance name. For example: credentials: github: # github lister github: # has only one instance (so far) - username: some password: somekey - username: one password: onekey - ... gitlab: # gitlab lister riseup: # has many instances - username: someone password: ... - ... gitlab: - username: someone password: ... - ... MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} - all_creds = self.config['credentials'] - lister_creds = all_creds.get(self.LISTER_NAME, {}) - creds = lister_creds.get(self.instance, {}) + creds = self.request_instance_credentials() + if not creds: + return params auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements SWHListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, api_baseurl=None): if not api_baseurl: raise NameError('HTTP Lister Transport requires api_baseurl.') self.api_baseurl = api_baseurl # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ def _transport_action(self, identifier, method='get'): """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) try: if method == 'head': response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: logger.warning('Failed to fetch %s: %s', path, e) raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier): """Retrieve head information on api. """ return self._transport_action(identifier, method='head') def transport_request(self, identifier): """Implements SWHListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response): """Implements SWHListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s class ListerOnePageApiTransport(SWHListerHttpTransport): """Leverage requests library to retrieve basic html page and parse result. To be used in conjunction with SWHListerBase or a subclass of it. """ PAGE = AbstractAttribute("The server api's unique page to retrieve and " "parse for information") PATH_TEMPLATE = None # we do not use it def __init__(self, api_baseurl=None): self.session = requests.Session() self.lister_version = __version__ def request_uri(self, _): """Get the full request URI given the transport_request identifier. """ return self.PAGE diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index c02103d..7009c10 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,143 +1,178 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging + import urllib.parse from swh.lister.core.indexing_lister import SWHIndexingHttpLister from swh.lister.phabricator.models import PhabricatorModel from collections import defaultdict +logger = logging.getLogger(__name__) -class PhabricatorLister(SWHIndexingHttpLister): - PATH_TEMPLATE = '&order=oldest&attachments[uris]=1&after=%s' +class PhabricatorLister(SWHIndexingHttpLister): + PATH_TEMPLATE = '?order=oldest&attachments[uris]=1&after=%s' MODEL = PhabricatorModel LISTER_NAME = 'phabricator' - def __init__(self, forge_url, api_token, instance=None, + def __init__(self, forge_url, instance=None, api_token=None, override_config=None): if forge_url.endswith("/"): forge_url = forge_url[:-1] self.forge_url = forge_url - api_endpoint = ('api/diffusion.repository.' - 'search?api.token=%s') % api_token - api_baseurl = '%s/%s' % (forge_url, api_endpoint) + api_baseurl = '%s/api/diffusion.repository.search' % forge_url + self.api_token = api_token if not instance: instance = urllib.parse.urlparse(forge_url).hostname self.instance = instance super().__init__(api_baseurl=api_baseurl, override_config=override_config) + def _build_query_params(self, params, api_token): + """Build query params to include the forge's api token + + Returns: + updated params dict with 'params' entry. + + """ + params.update({'params': {'api.token': api_token}}) + return params + + def request_params(self, identifier): + """Override the default params behavior to retrieve the api token + + Credentials are stored as: + + credentials: + phabricator: + : + - username: + password: + + """ + params = {} + params['headers'] = self.request_headers() or {} + if self.api_token: + return self._build_query_params(params, self.api_token) + instance_creds = self.request_instance_credentials() + if not instance_creds: + raise ValueError( + 'Phabricator forge needs authentication credential to list.') + api_token = instance_creds[0]['password'] + return self._build_query_params(params, api_token) + def request_headers(self): """ (Override) Set requests headers to send when querying the Phabricator API """ return {'User-Agent': 'Software Heritage phabricator lister', 'Accept': 'application/json'} def get_model_from_repo(self, repo): url = get_repo_url(repo['attachments']['uris']['uris']) if url is None: return None return { 'uid': self.forge_url + str(repo['id']), 'indexable': repo['id'], 'name': repo['fields']['shortName'], 'full_name': repo['fields']['name'], 'html_url': url, 'origin_url': url, 'description': None, 'origin_type': repo['fields']['vcs'] } def get_next_target_from_response(self, response): body = response.json()['result']['cursor'] if body['after'] != 'null': return body['after'] else: return None def transport_response_simplified(self, response): repos = response.json() if repos['result'] is None: raise ValueError( 'Problem during information fetch: %s' % repos['error_code']) repos = repos['result']['data'] return [self.get_model_from_repo(repo) for repo in repos] def filter_before_inject(self, models_list): """ (Overrides) SWHIndexingLister.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [m for m in models_list if m is not None] return super().filter_before_inject(models_list) def _bootstrap_repositories_listing(self): """ Method called when no min_bound value has been provided to the lister. Its purpose is to: 1. get the first repository data hosted on the Phabricator instance 2. inject them into the lister database 3. return the first repository index to start the listing after that value Returns: int: The first repository index """ params = '&order=oldest&limit=1' response = self.safely_issue_request(params) models_list = self.transport_response_simplified(response) self.max_index = models_list[0]['indexable'] models_list = self.filter_before_inject(models_list) injected = self.inject_repo_data_into_db(models_list) self.schedule_missing_tasks(models_list, injected) return self.max_index def run(self, min_bound=None, max_bound=None): """ (Override) Run the lister on the specified Phabricator instance Args: min_bound (int): Optional repository index to start the listing after it max_bound (int): Optional repository index to stop the listing after it """ # initial call to the lister, we need to bootstrap it in that case if min_bound is None: min_bound = self._bootstrap_repositories_listing() super().run(min_bound, max_bound) def get_repo_url(attachments): """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) for uri in attachments: protocol = uri['fields']['builtin']['protocol'] url = uri['fields']['uri']['effective'] identifier = uri['fields']['builtin']['identifier'] if protocol in ('http', 'https'): processed_urls[protocol][identifier] = url elif protocol is None: for protocol in ('https', 'http'): if url.startswith(protocol): processed_urls[protocol]['undefined'] = url break for protocol in ['https', 'http']: for identifier in ['shortname', 'callsign', 'id', 'undefined']: if (protocol in processed_urls and identifier in processed_urls[protocol]): return processed_urls[protocol][identifier] return None diff --git a/swh/lister/phabricator/tasks.py b/swh/lister/phabricator/tasks.py index 3ebb981..ce37fa4 100644 --- a/swh/lister/phabricator/tasks.py +++ b/swh/lister/phabricator/tasks.py @@ -1,29 +1,29 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.scheduler.celery_backend.config import app from swh.lister.phabricator.lister import PhabricatorLister -def new_lister(forge_url='https://forge.softwareheritage.org', api_token='', - instance='swh', **kw): - return PhabricatorLister(forge_url=forge_url, api_token=api_token, - instance=instance, **kw) +def new_lister(forge_url='https://forge.softwareheritage.org', instance='swh', + api_token=None, **kw): + return PhabricatorLister( + forge_url=forge_url, instance=instance, api_token=api_token, **kw) @app.task(name=__name__ + '.IncrementalPhabricatorLister') def incremental_phabricator_lister(**lister_args): lister = new_lister(**lister_args) lister.run(min_bound=lister.db_last_index()) @app.task(name=__name__ + '.FullPhabricatorLister') def full_phabricator_lister(**lister_args): lister = new_lister(**lister_args) lister.run() @app.task(name=__name__ + '.ping') def ping(): return 'OK' diff --git a/swh/lister/phabricator/tests/test_tasks.py b/swh/lister/phabricator/tests/test_tasks.py index a97196f..0aa27d6 100644 --- a/swh/lister/phabricator/tests/test_tasks.py +++ b/swh/lister/phabricator/tests/test_tasks.py @@ -1,30 +1,30 @@ from unittest.mock import patch def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.phabricator.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.phabricator.tasks.PhabricatorLister') def test_incremental(lister, swh_app, celery_session_worker): # setup the mocked PhabricatorLister lister.return_value = lister lister.db_last_index.return_value = 42 lister.run.return_value = None res = swh_app.send_task( 'swh.lister.phabricator.tasks.IncrementalPhabricatorLister') assert res res.wait() assert res.successful() lister.assert_called_once_with( - api_token='', forge_url='https://forge.softwareheritage.org', + api_token=None, forge_url='https://forge.softwareheritage.org', instance='swh') lister.db_last_index.assert_called_once_with() lister.run.assert_called_once_with(min_bound=42)