diff --git a/.gitignore b/.gitignore index 0fe4dd9..a7b82b0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,13 @@ *.pyc *.sw? *~ /.coverage /.coverage.* .eggs/ __pycache__ build/ dist/ *.egg-info version.txt -swh/lister/_version.py .tox/ .mypy_cache/ diff --git a/setup.py b/setup.py index 823747f..905d1f4 100755 --- a/setup.py +++ b/setup.py @@ -1,81 +1,81 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = 'requirements-%s.txt' % name else: reqf = 'requirements.txt' requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements setup( name='swh.lister', description='Software Heritage lister', long_description=long_description, long_description_content_type='text/markdown', author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/DLSGH/', packages=find_packages(), install_requires=parse_requirements() + parse_requirements('swh'), tests_require=parse_requirements('test'), setup_requires=['vcversioner'], extras_require={'testing': parse_requirements('test')}, - vcversioner={'version_module_paths': ['swh/lister/_version.py']}, + vcversioner={}, include_package_data=True, entry_points=''' [swh.cli.subcommands] lister=swh.lister.cli:lister [swh.workers] lister.bitbucket=swh.lister.bitbucket:register lister.cgit=swh.lister.cgit:register lister.cran=swh.lister.cran:register lister.debian=swh.lister.debian:register lister.github=swh.lister.github:register lister.gitlab=swh.lister.gitlab:register lister.gnu=swh.lister.gnu:register lister.npm=swh.lister.npm:register lister.packagist=swh.lister.packagist:register lister.phabricator=swh.lister.phabricator:register lister.pypi=swh.lister.pypi:register ''', classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-lister', }, ) diff --git a/swh/lister/__init__.py b/swh/lister/__init__.py index 5697901..600c231 100644 --- a/swh/lister/__init__.py +++ b/swh/lister/__init__.py @@ -1,43 +1,49 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import pkg_resources logger = logging.getLogger(__name__) +try: + __version__ = pkg_resources.get_distribution('swh.lister').version +except pkg_resources.DistributionNotFound: + __version__ = 'devel' + + LISTERS = {entry_point.name.split('.', 1)[1]: entry_point for entry_point in pkg_resources.iter_entry_points('swh.workers') if entry_point.name.split('.', 1)[0] == 'lister'} SUPPORTED_LISTERS = list(LISTERS) def get_lister(lister_name, db_url=None, **conf): """Instantiate a lister given its name. Args: lister_name (str): Lister's name conf (dict): Configuration dict (lister db cnx, policy, priority...) Returns: Tuple (instantiated lister, drop_tables function, init schema function, insert minimum data function) """ if lister_name not in LISTERS: raise ValueError( 'Invalid lister %s: only supported listers are %s' % (lister_name, SUPPORTED_LISTERS)) if db_url: conf['lister'] = {'cls': 'local', 'args': {'db': db_url}} registry_entry = LISTERS[lister_name].load()() lister_cls = registry_entry['lister'] lister = lister_cls(override_config=conf) return lister diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index f7a62c4..93b1038 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,235 +1,232 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import logging import requests import xmltodict from typing import Optional, Union -try: - from swh.lister._version import __version__ -except ImportError: - __version__ = 'devel' +from swh.lister import __version__ from .abstractattribute import AbstractAttribute from .lister_base import FetchError logger = logging.getLogger(__name__) class ListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with ListerBase or a subclass of it. """ DEFAULT_URL = None # type: Optional[str] PATH_TEMPLATE = \ AbstractAttribute( 'string containing a python string format pattern that produces' ' the API endpoint path for listing stored repositories when given' ' an index, e.g., "/repositories?after=%s". To be implemented in' ' the API-specific class inheriting this.' ) # type: Union[AbstractAttribute, Optional[str]] EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version } def request_instance_credentials(self): """Returns dictionary of any credentials configuration needed by the forge instance to list. The 'credentials' configuration is expected to be a dict of multiple levels. The first level is the lister's name, the second is the lister's instance name, which value is expected to be a list of credential structures (typically a couple username/password). For example: credentials: github: # github lister github: # has only one instance (so far) - username: some password: somekey - username: one password: onekey - ... gitlab: # gitlab lister riseup: # has many instances - username: someone password: ... - ... gitlab: - username: someone password: ... - ... Returns: list of credential dicts for the current lister. """ all_creds = self.config.get('credentials') if not all_creds: return [] lister_creds = all_creds.get(self.LISTER_NAME, {}) creds = lister_creds.get(self.instance, []) return creds def request_uri(self, identifier): """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier return self.url + path def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. This uses credentials if any are provided (see request_instance_credentials). MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} creds = self.request_instance_credentials() if not creds: return params auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements ListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, url=None): if not url: url = self.config.get('url') if not url: url = self.DEFAULT_URL if not url: raise NameError('HTTP Lister Transport requires an url.') self.url = url # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ def _transport_action(self, identifier, method='get'): """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) logger.debug('path: %s', path) logger.debug('params: %s', params) logger.debug('method: %s', method) try: if method == 'head': response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: logger.warning('Failed to fetch %s: %s', path, e) raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier): """Retrieve head information on api. """ return self._transport_action(identifier, method='head') def transport_request(self, identifier): """Implements ListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response): """Implements ListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s class ListerOnePageApiTransport(ListerHttpTransport): """Leverage requests library to retrieve basic html page and parse result. To be used in conjunction with ListerBase or a subclass of it. """ PAGE = AbstractAttribute( "URL of the API's unique page to retrieve and parse " "for information") # type: Union[AbstractAttribute, str] PATH_TEMPLATE = None # we do not use it def __init__(self, url=None): self.session = requests.Session() self.lister_version = __version__ def request_uri(self, _): """Get the full request URI given the transport_request identifier. """ return self.PAGE diff --git a/swh/lister/core/tests/test_lister.py b/swh/lister/core/tests/test_lister.py index f19bdd2..8d8d6ba 100644 --- a/swh/lister/core/tests/test_lister.py +++ b/swh/lister/core/tests/test_lister.py @@ -1,427 +1,433 @@ # Copyright (C) 2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import datetime import time from unittest import TestCase from unittest.mock import Mock, patch import requests_mock from sqlalchemy import create_engine from typing import Any, Callable, Optional, Pattern, Type, Union +import swh.lister from swh.lister.core.abstractattribute import AbstractAttribute from swh.lister.tests.test_utils import init_db def noop(*args, **kwargs): pass +def test_version_generation(): + assert swh.lister.__version__ != 'devel', \ + "Make sure swh.lister is installed (e.g. pip install -e .)" + + class HttpListerTesterBase(abc.ABC): """Testing base class for listers. This contains methods for both :class:`HttpSimpleListerTester` and :class:`HttpListerTester`. See :class:`swh.lister.gitlab.tests.test_lister` for an example of how to customize for a specific listing service. """ Lister = AbstractAttribute( 'Lister class to test') # type: Union[AbstractAttribute, Type[Any]] lister_subdir = AbstractAttribute( 'bitbucket, github, etc.') # type: Union[AbstractAttribute, str] good_api_response_file = AbstractAttribute( 'Example good response body') # type: Union[AbstractAttribute, str] LISTER_NAME = 'fake-lister' # May need to override this if the headers are used for something def response_headers(self, request): return {} # May need to override this if the server uses non-standard rate limiting # method. # Please keep the requested retry delay reasonably low. def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 429 context.headers['Retry-After'] = '1' return '{"error":"dummy"}' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rate_limit = 1 self.response = None self.fl = None self.helper = None self.scheduler_tasks = [] if self.__class__ != HttpListerTesterBase: self.run = TestCase.run.__get__(self, self.__class__) else: self.run = noop def mock_limit_n_response(self, n, request, context): self.fl.reset_backoff() if self.rate_limit <= n: return self.mock_rate_quota(n, request, context) else: return self.mock_response(request, context) def mock_limit_twice_response(self, request, context): return self.mock_limit_n_response(2, request, context) def get_api_response(self, identifier): fl = self.get_fl() if self.response is None: self.response = fl.safely_issue_request(identifier) return self.response def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: self.fl = self.Lister(url='https://fakeurl', override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() self.scheduler_tasks = [] return self.fl def disable_scheduler(self, fl): fl.schedule_missing_tasks = Mock(return_value=None) def mock_scheduler(self, fl): def _create_tasks(tasks): task_id = 0 current_nb_tasks = len(self.scheduler_tasks) if current_nb_tasks > 0: task_id = self.scheduler_tasks[-1]['id'] + 1 for task in tasks: scheduler_task = dict(task) scheduler_task.update({ 'status': 'next_run_not_scheduled', 'retries_left': 0, 'priority': None, 'id': task_id, 'current_interval': datetime.timedelta(days=64) }) self.scheduler_tasks.append(scheduler_task) task_id = task_id + 1 return self.scheduler_tasks[current_nb_tasks:] def _disable_tasks(task_ids): for task_id in task_ids: self.scheduler_tasks[task_id]['status'] = 'disabled' fl.scheduler.create_tasks = Mock(wraps=_create_tasks) fl.scheduler.disable_tasks = Mock(wraps=_disable_tasks) def disable_db(self, fl): fl.winnow_models = Mock(return_value=[]) fl.db_inject_repo = Mock(return_value=fl.MODEL()) fl.disable_deleted_repo_tasks = Mock(return_value=None) def init_db(self, db, model): engine = create_engine(db.url()) model.metadata.create_all(engine) @requests_mock.Mocker() def test_is_within_bounds(self, http_mocker): fl = self.get_fl() self.assertFalse(fl.is_within_bounds(1, 2, 3)) self.assertTrue(fl.is_within_bounds(2, 1, 3)) self.assertTrue(fl.is_within_bounds(1, 1, 1)) self.assertTrue(fl.is_within_bounds(1, None, None)) self.assertTrue(fl.is_within_bounds(1, None, 2)) self.assertTrue(fl.is_within_bounds(1, 0, None)) self.assertTrue(fl.is_within_bounds("b", "a", "c")) self.assertFalse(fl.is_within_bounds("a", "b", "c")) self.assertTrue(fl.is_within_bounds("a", None, "c")) self.assertTrue(fl.is_within_bounds("a", None, None)) self.assertTrue(fl.is_within_bounds("b", "a", None)) self.assertFalse(fl.is_within_bounds("a", "b", None)) self.assertTrue(fl.is_within_bounds("aa:02", "aa:01", "aa:03")) self.assertFalse(fl.is_within_bounds("aa:12", None, "aa:03")) with self.assertRaises(TypeError): fl.is_within_bounds(1.0, "b", None) with self.assertRaises(TypeError): fl.is_within_bounds("A:B", "A::B", None) class HttpListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.indexing_lister.IndexingHttpLister` See :class:`swh.lister.github.tests.test_gh_lister` for an example of how to customize for a specific listing service. """ last_index = AbstractAttribute( 'Last index ' 'in good_api_response') # type: Union[AbstractAttribute, int] first_index = AbstractAttribute( 'First index in ' ' good_api_response') # type: Union[AbstractAttribute, Optional[int]] bad_api_response_file = AbstractAttribute( 'Example bad response body') # type: Union[AbstractAttribute, str] entries_per_page = AbstractAttribute( 'Number of results in ' 'good response') # type: Union[AbstractAttribute, int] test_re = AbstractAttribute( 'Compiled regex matching the server url. Must capture the ' 'index value.') # type: Union[AbstractAttribute, Pattern] convert_type = str # type: Callable[..., Any] """static method used to convert the "request_index" to its right type (for indexing listers for example, this is in accordance with the model's "indexable" column). """ def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) req_index = self.request_index(request) if req_index == self.first_index: response_file = self.good_api_response_file else: response_file = self.bad_api_response_file with open('swh/lister/%s/tests/%s' % (self.lister_subdir, response_file), 'r', encoding='utf-8') as r: return r.read() def request_index(self, request): m = self.test_re.search(request.path_url) if m and (len(m.groups()) > 0): return self.convert_type(m.group(1)) def create_fl_with_db(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) db = init_db() fl = self.get_fl(override_config={ 'lister': { 'cls': 'local', 'args': {'db': db.url()} } }) fl.db = db self.init_db(db, fl.MODEL) self.mock_scheduler(fl) return fl @requests_mock.Mocker() def test_fetch_no_bounds_yesdb(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run() self.assertEqual(fl.db_last_index(), self.last_index) ingested_repos = list(fl.db_query_range(self.first_index, self.last_index)) self.assertEqual(len(ingested_repos), self.entries_per_page) @requests_mock.Mocker() def test_fetch_multiple_pages_yesdb(self, http_mocker): fl = self.create_fl_with_db(http_mocker) fl.run(min_bound=self.first_index) self.assertEqual(fl.db_last_index(), self.last_index) partitions = fl.db_partition_indices(5) self.assertGreater(len(partitions), 0) for k in partitions: self.assertLessEqual(len(k), 5) self.assertGreater(len(k), 0) @requests_mock.Mocker() def test_fetch_none_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=1, max_bound=1) # stores no results # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_one_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index, max_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_fetch_multiple_pages_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_scheduler(fl) self.disable_db(fl) fl.run(min_bound=self.first_index) # FIXME: Determine what this method tries to test and add checks to # actually test @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of repos listed by the lister """ http_mocker.get(self.test_re, text=self.mock_response) li = self.get_fl().transport_response_simplified( self.get_api_response(self.first_index) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries_per_page) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() li = fl.transport_response_simplified( self.get_api_response(self.first_index)) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'id']: self.assertIn(k, di) @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.test_re, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response(self.first_index) self.assertEqual(sleepmock.call_count, 2) def scheduled_tasks_test(self, next_api_response_file, next_last_index, http_mocker): """Check that no loading tasks get disabled when processing a new page of repositories returned by a forge API """ fl = self.create_fl_with_db(http_mocker) # process first page of repositories listing fl.run() # process second page of repositories listing prev_last_index = self.last_index self.first_index = self.last_index self.last_index = next_last_index self.good_api_response_file = next_api_response_file fl.run(min_bound=prev_last_index) # check expected number of ingested repos and loading tasks ingested_repos = list(fl.db_query_range(0, self.last_index)) self.assertEqual(len(ingested_repos), len(self.scheduler_tasks)) self.assertEqual(len(ingested_repos), 2 * self.entries_per_page) # check tasks are not disabled for task in self.scheduler_tasks: self.assertTrue(task['status'] != 'disabled') class HttpSimpleListerTester(HttpListerTesterBase, abc.ABC): """Base testing class for subclass of :class:`swh.lister.core.simple)_lister.SimpleLister` See :class:`swh.lister.pypi.tests.test_lister` for an example of how to customize for a specific listing service. """ entries = AbstractAttribute( 'Number of results ' 'in good response') # type: Union[AbstractAttribute, int] PAGE = AbstractAttribute( "URL of the server api's unique page to retrieve and " "parse for information") # type: Union[AbstractAttribute, str] def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: self.fl = self.Lister( override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) response_file = self.good_api_response_file with open('swh/lister/%s/tests/%s' % (self.lister_subdir, response_file), 'r', encoding='utf-8') as r: return r.read() @requests_mock.Mocker() def test_api_request(self, http_mocker): """Test API request for rate limit handling """ http_mocker.get(self.PAGE, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response(0) self.assertEqual(sleepmock.call_count, 2) @requests_mock.Mocker() def test_model_map(self, http_mocker): """Check if all the keys of model are present in the model created by the `transport_response_simplified` """ http_mocker.get(self.PAGE, text=self.mock_response) fl = self.get_fl() li = fl.list_packages(self.get_api_response(0)) li = fl.transport_response_simplified(li) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'id']: self.assertIn(k, di) @requests_mock.Mocker() def test_repos_list(self, http_mocker): """Test the number of packages listed by the lister """ http_mocker.get(self.PAGE, text=self.mock_response) li = self.get_fl().list_packages( self.get_api_response(0) ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries)