diff --git a/swh/lister/__init__.py b/swh/lister/__init__.py index 600c231..840ceca 100644 --- a/swh/lister/__init__.py +++ b/swh/lister/__init__.py @@ -1,49 +1,52 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import pkg_resources logger = logging.getLogger(__name__) try: __version__ = pkg_resources.get_distribution('swh.lister').version except pkg_resources.DistributionNotFound: __version__ = 'devel' +USER_AGENT_TEMPLATE = 'Software Heritage Lister (%s)' +USER_AGENT = USER_AGENT_TEMPLATE % __version__ + LISTERS = {entry_point.name.split('.', 1)[1]: entry_point for entry_point in pkg_resources.iter_entry_points('swh.workers') if entry_point.name.split('.', 1)[0] == 'lister'} SUPPORTED_LISTERS = list(LISTERS) def get_lister(lister_name, db_url=None, **conf): """Instantiate a lister given its name. Args: lister_name (str): Lister's name conf (dict): Configuration dict (lister db cnx, policy, priority...) Returns: Tuple (instantiated lister, drop_tables function, init schema function, insert minimum data function) """ if lister_name not in LISTERS: raise ValueError( 'Invalid lister %s: only supported listers are %s' % (lister_name, SUPPORTED_LISTERS)) if db_url: conf['lister'] = {'cls': 'local', 'args': {'db': db_url}} registry_entry = LISTERS[lister_name].load()() lister_cls = registry_entry['lister'] lister = lister_cls(override_config=conf) return lister diff --git a/swh/lister/cgit/lister.py b/swh/lister/cgit/lister.py index d6c2077..37464c0 100644 --- a/swh/lister/cgit/lister.py +++ b/swh/lister/cgit/lister.py @@ -1,138 +1,142 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import logging from urllib.parse import urlparse, urljoin from bs4 import BeautifulSoup from requests import Session from requests.adapters import HTTPAdapter from .models import CGitModel from swh.core.utils import grouper +from swh.lister import USER_AGENT from swh.lister.core.lister_base import ListerBase logger = logging.getLogger(__name__) class CGitLister(ListerBase): """Lister class for CGit repositories. This lister will retrieve the list of published git repositories by parsing the HTML page(s) of the index retrieved at `url`. For each found git repository, a query is made at the given url found in this index to gather published "Clone" URLs to be used as origin URL for that git repo. If several "Clone" urls are provided, prefer the http/https one, if any, otherwise fall bak to the first one. A loader task is created for each git repository: Task: Type: load-git Policy: recurring Args: Example: Type: load-git Policy: recurring Args: 'https://git.savannah.gnu.org/git/elisp-es.git' """ MODEL = CGitModel DEFAULT_URL = 'https://git.savannah.gnu.org/cgit/' LISTER_NAME = 'cgit' url_prefix_present = True def __init__(self, url=None, instance=None, override_config=None): """Lister class for CGit repositories. Args: url (str): main URL of the CGit instance, i.e. url of the index of published git repositories on this instance. instance (str): Name of cgit instance. Defaults to url's hostname if unset. """ super().__init__(override_config=override_config) if url is None: url = self.config.get('url', self.DEFAULT_URL) self.url = url if not instance: instance = urlparse(url).hostname self.instance = instance self.session = Session() self.session.mount(self.url, HTTPAdapter(max_retries=3)) + self.session.headers = { + 'User-Agent': USER_AGENT, + } def run(self): total = 0 for repos in grouper(self.get_repos(), 10): models = list(filter(None, (self.build_model(repo) for repo in repos))) injected_repos = self.inject_repo_data_into_db(models) self.schedule_missing_tasks(models, injected_repos) self.db_session.commit() total += len(injected_repos) logger.debug('Scheduled %s tasks for %s', total, self.url) def get_repos(self): """Generate git 'project' URLs found on the current CGit server """ next_page = self.url while next_page: bs_idx = self.get_and_parse(next_page) for tr in bs_idx.find( 'div', {"class": "content"}).find_all( "tr", {"class": ""}): yield urljoin(self.url, tr.find('a')['href']) try: pager = bs_idx.find('ul', {'class': 'pager'}) current_page = pager.find('a', {'class': 'current'}) if current_page: next_page = current_page.parent.next_sibling.a['href'] next_page = urljoin(self.url, next_page) except (AttributeError, KeyError): # no pager, or no next page next_page = None def build_model(self, repo_url): """Given the URL of a git repo project page on a CGit server, return the repo description (dict) suitable for insertion in the db. """ bs = self.get_and_parse(repo_url) urls = [x['href'] for x in bs.find_all('a', {'rel': 'vcs-git'})] if not urls: return # look for the http/https url, if any, and use it as origin_url for url in urls: if urlparse(url).scheme in ('http', 'https'): origin_url = url break else: # otherwise, choose the first one origin_url = urls[0] return {'uid': repo_url, 'name': bs.find('a', title=re.compile('.+'))['title'], 'origin_type': 'git', 'instance': self.instance, 'origin_url': origin_url, } def get_and_parse(self, url): "Get the given url and parse the retrieved HTML using BeautifulSoup" return BeautifulSoup(self.session.get(url).text, features='html.parser') diff --git a/swh/lister/cgit/tests/test_lister.py b/swh/lister/cgit/tests/test_lister.py index b7887f0..a140cdd 100644 --- a/swh/lister/cgit/tests/test_lister.py +++ b/swh/lister/cgit/tests/test_lister.py @@ -1,66 +1,82 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from swh.lister import __version__ + + def test_lister_no_page(requests_mock_datadir, swh_listers): lister = swh_listers['cgit'] assert lister.url == 'https://git.savannah.gnu.org/cgit/' repos = list(lister.get_repos()) assert len(repos) == 977 assert repos[0] == 'https://git.savannah.gnu.org/cgit/elisp-es.git/' # note the url below is NOT a subpath of /cgit/ assert repos[-1] == 'https://git.savannah.gnu.org/path/to/yetris.git/' # noqa # note the url below is NOT on the same server assert repos[-2] == 'http://example.org/cgit/xstarcastle.git/' def test_lister_model(requests_mock_datadir, swh_listers): lister = swh_listers['cgit'] repo = next(lister.get_repos()) model = lister.build_model(repo) assert model == { 'uid': 'https://git.savannah.gnu.org/cgit/elisp-es.git/', 'name': 'elisp-es.git', 'origin_type': 'git', 'instance': 'git.savannah.gnu.org', 'origin_url': 'https://git.savannah.gnu.org/git/elisp-es.git' } def test_lister_with_pages(requests_mock_datadir, swh_listers): lister = swh_listers['cgit'] lister.url = 'https://git.tizen/cgit/' repos = list(lister.get_repos()) # we should have 16 repos (listed on 3 pages) assert len(repos) == 16 def test_lister_run(requests_mock_datadir, swh_listers): lister = swh_listers['cgit'] lister.url = 'https://git.tizen/cgit/' lister.run() r = lister.scheduler.search_tasks(task_type='load-git') assert len(r) == 16 for row in r: assert row['type'] == 'load-git' # arguments check args = row['arguments']['args'] assert len(args) == 1 url = args[0] assert url.startswith('https://git.tizen') # kwargs kwargs = row['arguments']['kwargs'] assert kwargs == {} assert row['policy'] == 'recurring' assert row['priority'] is None + + +def test_lister_requests(requests_mock_datadir, swh_listers): + lister = swh_listers['cgit'] + lister.url = 'https://git.tizen/cgit/' + lister.run() + + assert len(requests_mock_datadir.request_history) != 0 + for request in requests_mock_datadir.request_history: + assert 'User-Agent' in request.headers + user_agent = request.headers['User-Agent'] + assert 'Software Heritage Lister' in user_agent + assert __version__ in user_agent diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index 93b1038..fe9832d 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,232 +1,232 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import logging import requests import xmltodict from typing import Optional, Union -from swh.lister import __version__ +from swh.lister import USER_AGENT_TEMPLATE, __version__ from .abstractattribute import AbstractAttribute from .lister_base import FetchError logger = logging.getLogger(__name__) class ListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with ListerBase or a subclass of it. """ DEFAULT_URL = None # type: Optional[str] PATH_TEMPLATE = \ AbstractAttribute( 'string containing a python string format pattern that produces' ' the API endpoint path for listing stored repositories when given' ' an index, e.g., "/repositories?after=%s". To be implemented in' ' the API-specific class inheriting this.' ) # type: Union[AbstractAttribute, Optional[str]] EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { - 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version + 'User-Agent': USER_AGENT_TEMPLATE % self.lister_version } def request_instance_credentials(self): """Returns dictionary of any credentials configuration needed by the forge instance to list. The 'credentials' configuration is expected to be a dict of multiple levels. The first level is the lister's name, the second is the lister's instance name, which value is expected to be a list of credential structures (typically a couple username/password). For example: credentials: github: # github lister github: # has only one instance (so far) - username: some password: somekey - username: one password: onekey - ... gitlab: # gitlab lister riseup: # has many instances - username: someone password: ... - ... gitlab: - username: someone password: ... - ... Returns: list of credential dicts for the current lister. """ all_creds = self.config.get('credentials') if not all_creds: return [] lister_creds = all_creds.get(self.LISTER_NAME, {}) creds = lister_creds.get(self.instance, []) return creds def request_uri(self, identifier): """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier return self.url + path def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. This uses credentials if any are provided (see request_instance_credentials). MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} creds = self.request_instance_credentials() if not creds: return params auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements ListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, url=None): if not url: url = self.config.get('url') if not url: url = self.DEFAULT_URL if not url: raise NameError('HTTP Lister Transport requires an url.') self.url = url # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ def _transport_action(self, identifier, method='get'): """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) logger.debug('path: %s', path) logger.debug('params: %s', params) logger.debug('method: %s', method) try: if method == 'head': response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: logger.warning('Failed to fetch %s: %s', path, e) raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier): """Retrieve head information on api. """ return self._transport_action(identifier, method='head') def transport_request(self, identifier): """Implements ListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response): """Implements ListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s class ListerOnePageApiTransport(ListerHttpTransport): """Leverage requests library to retrieve basic html page and parse result. To be used in conjunction with ListerBase or a subclass of it. """ PAGE = AbstractAttribute( "URL of the API's unique page to retrieve and parse " "for information") # type: Union[AbstractAttribute, str] PATH_TEMPLATE = None # we do not use it def __init__(self, url=None): self.session = requests.Session() self.lister_version = __version__ def request_uri(self, _): """Get the full request URI given the transport_request identifier. """ return self.PAGE diff --git a/swh/lister/core/tests/test_lister.py b/swh/lister/core/tests/test_lister.py index 8d8d6ba..908fd9c 100644 --- a/swh/lister/core/tests/test_lister.py +++ b/swh/lister/core/tests/test_lister.py @@ -1,433 +1,444 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import time from unittest import TestCase from unittest.mock import Mock, patch import requests_mock from sqlalchemy import create_engine from typing import Any, Callable, Optional, Pattern, Type, Union import swh.lister from swh.lister.core.abstractattribute import AbstractAttribute from swh.lister.tests.test_utils import init_db def noop(*args, **kwargs): pass def test_version_generation(): assert swh.lister.__version__ != 'devel', \ "Make sure swh.lister is installed (e.g. pip install -e .)" class HttpListerTesterBase(abc.ABC): """Testing base class for listers. This contains methods for both :class:`HttpSimpleListerTester` and :class:`HttpListerTester`. See :class:`swh.lister.gitlab.tests.test_lister` for an example of how to customize for a specific listing service. """ Lister = AbstractAttribute( 'Lister class to test') # type: Union[AbstractAttribute, Type[Any]] lister_subdir = AbstractAttribute( 'bitbucket, github, etc.') # type: Union[AbstractAttribute, str] good_api_response_file = AbstractAttribute( 'Example good response body') # type: Union[AbstractAttribute, str] LISTER_NAME = 'fake-lister' # May need to override this if the headers are used for something def response_headers(self, request): return {} # May need to override this if the server uses non-standard rate limiting # method. # Please keep the requested retry delay reasonably low. def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 429 context.headers['Retry-After'] = '1' return '{"error":"dummy"}' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rate_limit = 1 self.response = None self.fl = None self.helper = None self.scheduler_tasks = [] if self.__class__ != HttpListerTesterBase: self.run = TestCase.run.__get__(self, self.__class__) else: self.run = noop def mock_limit_n_response(self, n, request, context): self.fl.reset_backoff() if self.rate_limit <= n: return self.mock_rate_quota(n, request, context) else: return self.mock_response(request, context) def mock_limit_twice_response(self, request, context): return self.mock_limit_n_response(2, request, context) def get_api_response(self, identifier): fl = self.get_fl() if self.response is None: self.response = fl.safely_issue_request(identifier) return self.response def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: self.fl = self.Lister(url='https://fakeurl', override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() self.scheduler_tasks = [] return self.fl def disable_scheduler(self, fl): fl.schedule_missing_tasks = Mock(return_value=None) def mock_scheduler(self, fl): def _create_tasks(tasks): task_id = 0 current_nb_tasks = len(self.scheduler_tasks) if current_nb_tasks > 0: task_id = self.scheduler_tasks[-1]['id'] + 1 for task in tasks: scheduler_task = dict(task) scheduler_task.update({ 'status': 'next_run_not_scheduled', 'retries_left': 0, 'priority': None, 'id': task_id, 'current_interval': datetime.timedelta(days=64) }) self.scheduler_tasks.append(scheduler_task) task_id = task_id + 1 return self.scheduler_tasks[current_nb_tasks:] def _disable_tasks(task_ids): for task_id in task_ids: self.scheduler_tasks[task_id]['status'] = 'disabled' fl.scheduler.create_tasks = Mock(wraps=_create_tasks) fl.scheduler.disable_tasks = Mock(wraps=_disable_tasks) def disable_db(self, fl): fl.winnow_models = Mock(return_value=[]) fl.db_inject_repo = Mock(return_value=fl.MODEL()) fl.disable_deleted_repo_tasks = Mock(return_value=None) def init_db(self, db, model): engine = create_engine(db.url()) model.metadata.create_all(engine) @requests_mock.Mocker() def test_is_within_bounds(self, http_mocker): fl = self.get_fl() self.assertFalse(fl.is_within_bounds(1, 2, 3)) self.assertTrue(fl.is_within_bounds(2, 1, 3)) self.assertTrue(fl.is_within_bounds(1, 1, 1)) self.assertTrue(fl.is_within_bounds(1, None, None)) self.assertTrue(fl.is_within_bounds(1, None, 2)) self.assertTrue(fl.is_within_bounds(1, 0, None)) self.assertTrue(fl.is_within_bounds("b", "a", "c")) self.assertFalse(fl.is_within_bounds("a", "b", "c")) self.assertTrue(fl.is_within_bounds("a", None, "c")) self.assertTrue(fl.is_within_bounds("a", None, None)) self.assertTrue(fl.is_within_bounds("b", "a", None)) self.assertFalse(fl.is_within_bounds("a", "b", None)) self.assertTrue(fl.is_within_bounds("aa:02", "aa:01", "aa:03")) self.assertFalse(fl.is_within_bounds("aa:12", None, "aa:03")) with self.assertRaises(TypeError): fl.is_within_bounds(1.0, "b", None) with self.assertRaises(TypeError): fl.is_within_bounds("A:B", "A::B", None) class HttpListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.indexing_lister.IndexingHttpLister` See :class:`swh.lister.github.tests.test_gh_lister` for an example of how to customize for a specific listing service. """ last_index = AbstractAttribute( 'Last index ' 'in good_api_response') # type: Union[AbstractAttribute, int] first_index = AbstractAttribute( 'First index in ' ' good_api_response') # type: Union[AbstractAttribute, Optional[int]] bad_api_response_file = AbstractAttribute( 'Example bad response body') # type: Union[AbstractAttribute, str] entries_per_page = AbstractAttribute( 'Number of results in ' 'good response') # type: Union[AbstractAttribute, int] test_re = AbstractAttribute( 'Compiled regex matching the server url. Must capture the ' 'index value.') # type: Union[AbstractAttribute, Pattern] convert_type = str # type: Callable[..., Any] """static method used to convert the "request_index" to its right type (for indexing listers for example, this is in accordance with the model's "indexable" column). """ def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) req_index = self.request_index(request) if req_index == self.first_index: response_file = self.good_api_response_file else: response_file = self.bad_api_response_file with open('swh/lister/%s/tests/%s' % (self.lister_subdir, response_file), 'r', encoding='utf-8') as r: return r.read() def request_index(self, request): m = self.test_re.search(request.path_url) if m and (len(m.groups()) > 0): return self.convert_type(m.group(1)) def create_fl_with_db(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) db = init_db() fl = self.get_fl(override_config={ 'lister': { 'cls': 'local', 'args': {'db': db.url()} } }) fl.db = db self.init_db(db, fl.MODEL) self.mock_scheduler(fl) return fl @requests_mock.Mocker() def test_fetch_no_bounds_yesdb(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run() self.assertEqual(fl.db_last_index(), self.last_index) ingested_repos = list(fl.db_query_range(self.first_index, self.last_index)) self.assertEqual(len(ingested_repos), self.entries_per_page) @requests_mock.Mocker() def test_fetch_multiple_pages_yesdb(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run(min_bound=self.first_index) self.assertEqual(fl.db_last_index(), self.last_index) partitions = fl.db_partition_indices(5) self.assertGreater(len(partitions), 0) for k in partitions: self.assertLessEqual(len(k), 5) self.assertGreater(len(k), 0) @requests_mock.Mocker() def test_fetch_none_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=1, max_bound=1) # stores no results # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_one_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index, max_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_multiple_pages_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of repos listed by the lister """ http_mocker.get(self.test_re, text=self.mock_response) li = self.get_fl().transport_response_simplified( self.get_api_response(self.first_index) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries_per_page) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() li = fl.transport_response_simplified( self.get_api_response(self.first_index)) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'id']: self.assertIn(k, di) @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.test_re, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response(self.first_index) self.assertEqual(sleepmock.call_count, 2) + @requests_mock.Mocker() + def test_request_headers(self, http_mocker): + fl = self.create_fl_with_db(http_mocker) + fl.run() + self.assertNotEqual(len(http_mocker.request_history), 0) + for request in http_mocker.request_history: + assert 'User-Agent' in request.headers + user_agent = request.headers['User-Agent'] + assert 'Software Heritage Lister' in user_agent + assert swh.lister.__version__ in user_agent + def scheduled_tasks_test(self, next_api_response_file, next_last_index, http_mocker): """Check that no loading tasks get disabled when processing a new page of repositories returned by a forge API """ fl = self.create_fl_with_db(http_mocker) # process first page of repositories listing fl.run() # process second page of repositories listing prev_last_index = self.last_index self.first_index = self.last_index self.last_index = next_last_index self.good_api_response_file = next_api_response_file fl.run(min_bound=prev_last_index) # check expected number of ingested repos and loading tasks ingested_repos = list(fl.db_query_range(0, self.last_index)) self.assertEqual(len(ingested_repos), len(self.scheduler_tasks)) self.assertEqual(len(ingested_repos), 2 * self.entries_per_page) # check tasks are not disabled for task in self.scheduler_tasks: self.assertTrue(task['status'] != 'disabled') class HttpSimpleListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.simple)_lister.SimpleLister` See :class:`swh.lister.pypi.tests.test_lister` for an example of how to customize for a specific listing service. """ entries = AbstractAttribute( 'Number of results ' 'in good response') # type: Union[AbstractAttribute, int] PAGE = AbstractAttribute( "URL of the server api's unique page to retrieve and " "parse for information") # type: Union[AbstractAttribute, str] def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: self.fl = self.Lister( override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) response_file = self.good_api_response_file with open('swh/lister/%s/tests/%s' % (self.lister_subdir, response_file), 'r', encoding='utf-8') as r: return r.read() @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.PAGE, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response(0) self.assertEqual(sleepmock.call_count, 2) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.PAGE, text=self.mock_response) fl = self.get_fl() li = fl.list_packages(self.get_api_response(0)) li = fl.transport_response_simplified(li) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'id']: self.assertIn(k, di) @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of packages listed by the lister """ http_mocker.get(self.PAGE, text=self.mock_response) li = self.get_fl().list_packages( self.get_api_response(0) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries) diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py index 63462c1..30e686e 100644 --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -1,68 +1,73 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import time from typing import Any from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.github.models import GitHubModel class GitHubLister(IndexingHttpLister): PATH_TEMPLATE = '/repositories?since=%d' MODEL = GitHubModel DEFAULT_URL = 'https://api.github.com' API_URL_INDEX_RE = re.compile(r'^.*/repositories\?since=(\d+)') LISTER_NAME = 'github' instance = 'github' # There is only 1 instance of such lister default_min_bound = 0 # type: Any def get_model_from_repo(self, repo): return { 'uid': repo['id'], 'indexable': repo['id'], 'name': repo['name'], 'full_name': repo['full_name'], 'html_url': repo['html_url'], 'origin_url': repo['html_url'], 'origin_type': 'git', 'fork': repo['fork'], } def transport_quota_check(self, response): x_rate_limit_remaining = response.headers.get('X-RateLimit-Remaining') if not x_rate_limit_remaining: return False, 0 reqs_remaining = int(x_rate_limit_remaining) if response.status_code == 403 and reqs_remaining == 0: reset_at = int(response.headers['X-RateLimit-Reset']) delay = min(reset_at - time.time(), 3600) return True, delay return False, 0 def get_next_target_from_response(self, response): if 'next' in response.links: next_url = response.links['next']['url'] return int(self.API_URL_INDEX_RE.match(next_url).group(1)) def transport_response_simplified(self, response): repos = response.json() return [self.get_model_from_repo(repo) for repo in repos] def request_headers(self): - return {'Accept': 'application/vnd.github.v3+json'} + """(Override) Set requests headers to send when querying the GitHub API + + """ + headers = super().request_headers() + headers['Accept'] = 'application/vnd.github.v3+json' + return headers def disable_deleted_repo_tasks(self, index, next_index, keep_these): """ (Overrides) Fix provided index value to avoid erroneously disabling some scheduler tasks """ # Next listed repository ids are strictly greater than the 'since' # parameter, so increment the index to avoid disabling the latest # created task when processing a new repositories page returned by # the Github API return super().disable_deleted_repo_tasks(index + 1, next_index, keep_these) diff --git a/swh/lister/npm/lister.py b/swh/lister/npm/lister.py index dae76bf..89eda25 100644 --- a/swh/lister/npm/lister.py +++ b/swh/lister/npm/lister.py @@ -1,165 +1,166 @@ # Copyright (C) 2018-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from urllib.parse import quote from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.npm.models import NpmModel from swh.scheduler.utils import create_task_dict class NpmListerBase(IndexingHttpLister): """List packages available in the npm registry in a paginated way """ MODEL = NpmModel LISTER_NAME = 'npm' instance = 'npm' def __init__(self, url='https://replicate.npmjs.com', per_page=1000, override_config=None): super().__init__(url=url, override_config=override_config) self.per_page = per_page + 1 self.PATH_TEMPLATE += '&limit=%s' % self.per_page @property def ADDITIONAL_CONFIG(self): """(Override) Add extra configuration """ default_config = super().ADDITIONAL_CONFIG default_config['loading_task_policy'] = ('str', 'recurring') return default_config def get_model_from_repo(self, repo_name): """(Override) Transform from npm package name to model """ package_url, package_metadata_url = self._compute_urls(repo_name) return { 'uid': repo_name, 'indexable': repo_name, 'name': repo_name, 'full_name': repo_name, 'html_url': package_metadata_url, 'origin_url': package_url, 'origin_type': 'npm', } def task_dict(self, origin_type, origin_url, **kwargs): """(Override) Return task dict for loading a npm package into the archive. This is overridden from the lister_base as more information is needed for the ingestion task creation. """ task_type = 'load-%s' % origin_type task_policy = self.config['loading_task_policy'] package_name = kwargs.get('name') package_metadata_url = kwargs.get('html_url') return create_task_dict(task_type, task_policy, package_name=package_name, package_url=origin_url, package_metadata_url=package_metadata_url) def request_headers(self): """(Override) Set requests headers to send when querying the npm registry. """ - return {'User-Agent': 'Software Heritage npm lister', - 'Accept': 'application/json'} + headers = super().request_headers() + headers['Accept'] = 'application/json' + return headers def _compute_urls(self, repo_name): """Return a tuple (package_url, package_metadata_url) """ return ( 'https://www.npmjs.com/package/%s' % repo_name, # package metadata url needs to be escaped otherwise some requests # may fail (for instance when a package name contains '/') '%s/%s' % (self.url, quote(repo_name, safe='')) ) def string_pattern_check(self, inner, lower, upper=None): """ (Override) Inhibit the effect of that method as packages indices correspond to package names and thus do not respect any kind of fixed length string pattern """ pass class NpmLister(NpmListerBase): """List all packages available in the npm registry in a paginated way """ PATH_TEMPLATE = '/_all_docs?startkey="%s"' def get_next_target_from_response(self, response): """(Override) Get next npm package name to continue the listing """ repos = response.json()['rows'] return repos[-1]['id'] if len(repos) == self.per_page else None def transport_response_simplified(self, response): """(Override) Transform npm registry response to list for model manipulation """ repos = response.json()['rows'] if len(repos) == self.per_page: repos = repos[:-1] return [self.get_model_from_repo(repo['id']) for repo in repos] class NpmIncrementalLister(NpmListerBase): """List packages in the npm registry, updated since a specific update_seq value of the underlying CouchDB database, in a paginated way. """ PATH_TEMPLATE = '/_changes?since=%s' @property def CONFIG_BASE_FILENAME(self): # noqa: N802 return 'lister_npm_incremental' def get_next_target_from_response(self, response): """(Override) Get next npm package name to continue the listing. """ repos = response.json()['results'] return repos[-1]['seq'] if len(repos) == self.per_page else None def transport_response_simplified(self, response): """(Override) Transform npm registry response to list for model manipulation. """ repos = response.json()['results'] if len(repos) == self.per_page: repos = repos[:-1] return [self.get_model_from_repo(repo['id']) for repo in repos] def filter_before_inject(self, models_list): """(Override) Filter out documents in the CouchDB database not related to a npm package. """ models_filtered = [] for model in models_list: package_name = model['name'] # document related to CouchDB internals if package_name.startswith('_design/'): continue models_filtered.append(model) return models_filtered def disable_deleted_repo_tasks(self, start, end, keep_these): """(Override) Disable the processing performed by that method as it is not relevant in this incremental lister context. It also raises an exception due to a different index type (int instead of str). """ pass diff --git a/swh/lister/npm/tests/test_tasks.py b/swh/lister/npm/tests/test_tasks.py index 9e9deb6..491374f 100644 --- a/swh/lister/npm/tests/test_tasks.py +++ b/swh/lister/npm/tests/test_tasks.py @@ -1,55 +1,54 @@ from contextlib import contextmanager from unittest.mock import patch @contextmanager def mock_save(lister): yield def test_ping(swh_app, celery_session_worker): res = swh_app.send_task( 'swh.lister.npm.tasks.ping') assert res res.wait() assert res.successful() assert res.result == 'OK' @patch('swh.lister.npm.tasks.save_registry_state') @patch('swh.lister.npm.tasks.NpmLister') def test_lister(lister, save, swh_app, celery_session_worker): # setup the mocked NpmLister lister.return_value = lister lister.run.return_value = None save.side_effect = mock_save res = swh_app.send_task('swh.lister.npm.tasks.NpmListerTask') assert res res.wait() assert res.successful() lister.assert_called_once_with() lister.run.assert_called_once_with() @patch('swh.lister.npm.tasks.save_registry_state') @patch('swh.lister.npm.tasks.get_last_update_seq') @patch('swh.lister.npm.tasks.NpmIncrementalLister') def test_incremental(lister, seq, save, swh_app, celery_session_worker): # setup the mocked NpmLister lister.return_value = lister lister.run.return_value = None - lister.request_headers.return_value = [] seq.return_value = 42 save.side_effect = mock_save res = swh_app.send_task( 'swh.lister.npm.tasks.NpmIncrementalListerTask') assert res res.wait() assert res.successful() lister.assert_called_once_with() seq.assert_called_once_with(lister) lister.run.assert_called_once_with(min_bound=42) diff --git a/swh/lister/phabricator/lister.py b/swh/lister/phabricator/lister.py index fee968b..f198a1b 100644 --- a/swh/lister/phabricator/lister.py +++ b/swh/lister/phabricator/lister.py @@ -1,181 +1,182 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import random import urllib.parse from collections import defaultdict from sqlalchemy import func from swh.lister.core.indexing_lister import IndexingHttpLister from swh.lister.phabricator.models import PhabricatorModel logger = logging.getLogger(__name__) class PhabricatorLister(IndexingHttpLister): PATH_TEMPLATE = '?order=oldest&attachments[uris]=1&after=%s' DEFAULT_URL = \ 'https://forge.softwareheritage.org/api/diffusion.repository.search' MODEL = PhabricatorModel LISTER_NAME = 'phabricator' def __init__(self, url=None, instance=None, override_config=None): super().__init__(url=url, override_config=override_config) if not instance: instance = urllib.parse.urlparse(self.url).hostname self.instance = instance def request_params(self, identifier): """Override the default params behavior to retrieve the api token Credentials are stored as: credentials: phabricator: : - username: password: """ creds = self.request_instance_credentials() if not creds: raise ValueError( 'Phabricator forge needs authentication credential to list.') api_token = random.choice(creds)['password'] return {'headers': self.request_headers() or {}, 'params': {'api.token': api_token}} def request_headers(self): """ (Override) Set requests headers to send when querying the Phabricator API """ - return {'User-Agent': 'Software Heritage phabricator lister', - 'Accept': 'application/json'} + headers = super().request_headers() + headers['Accept'] = 'application/json' + return headers def get_model_from_repo(self, repo): url = get_repo_url(repo['attachments']['uris']['uris']) if url is None: return None return { 'uid': url, 'indexable': repo['id'], 'name': repo['fields']['shortName'], 'full_name': repo['fields']['name'], 'html_url': url, 'origin_url': url, 'origin_type': repo['fields']['vcs'], 'instance': self.instance, } def get_next_target_from_response(self, response): body = response.json()['result']['cursor'] if body['after'] and body['after'] != 'null': return int(body['after']) def transport_response_simplified(self, response): repos = response.json() if repos['result'] is None: raise ValueError( 'Problem during information fetch: %s' % repos['error_code']) repos = repos['result']['data'] return [self.get_model_from_repo(repo) for repo in repos] def filter_before_inject(self, models_list): """ (Overrides) IndexingLister.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [m for m in models_list if m is not None] return super().filter_before_inject(models_list) def disable_deleted_repo_tasks(self, index, next_index, keep_these): """ (Overrides) Fix provided index value to avoid: - database query error - erroneously disabling some scheduler tasks """ # First call to the Phabricator API uses an empty 'after' parameter, # so set the index to 0 to avoid database query error if index == '': index = 0 # Next listed repository ids are strictly greater than the 'after' # parameter, so increment the index to avoid disabling the latest # created task when processing a new repositories page returned by # the Phabricator API else: index = index + 1 return super().disable_deleted_repo_tasks(index, next_index, keep_these) def db_first_index(self): """ (Overrides) Filter results by Phabricator instance Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)) t = t.filter(self.MODEL.instance == self.instance).first() if t: return t[0] def db_last_index(self): """ (Overrides) Filter results by Phabricator instance Returns: the largest indexable value of all instance repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)) t = t.filter(self.MODEL.instance == self.instance).first() if t: return t[0] def db_query_range(self, start, end): """ (Overrides) Filter the results by the Phabricator instance to avoid disabling loading tasks for repositories hosted on a different instance. Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range for the instance """ retlist = super().db_query_range(start, end) return retlist.filter(self.MODEL.instance == self.instance) def get_repo_url(attachments): """ Return url for a hosted repository from its uris attachments according to the following priority lists: * protocol: https > http * identifier: shortname > callsign > id """ processed_urls = defaultdict(dict) for uri in attachments: protocol = uri['fields']['builtin']['protocol'] url = uri['fields']['uri']['effective'] identifier = uri['fields']['builtin']['identifier'] if protocol in ('http', 'https'): processed_urls[protocol][identifier] = url elif protocol is None: for protocol in ('https', 'http'): if url.startswith(protocol): processed_urls[protocol]['undefined'] = url break for protocol in ['https', 'http']: for identifier in ['shortname', 'callsign', 'id', 'undefined']: if (protocol in processed_urls and identifier in processed_urls[protocol]): return processed_urls[protocol][identifier] return None