diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index d086382..f76c4fc 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,210 +1,210 @@ # Copyright (C) 2015-2017 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from sqlalchemy import func from .lister_transports import SWHListerHttpTransport from .lister_base import SWHListerBase class SWHIndexingLister(SWHListerBase): """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known herein as the UID value, for every listed repository. - If the service splits the list of repositories into sublists, it must report at least one stable and sorted index identifier for every listed repository, known herein as the indexable value, which can be used as part of the service endpoint query to request a sublist beginning from that index. This might be the UID if the UID is monotonic. - Client sends a request to list repositories starting from a given index. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories starting from that index and, if necessary/available, some indication of the URL or index for fetching the next series of repository data. See :class:`swh.lister.core.lister_base.SWHListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject(self, models_list): """Overrides SWHListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ m for m in models_list if self.is_within_bounds(m['indexable'], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table in equal sized chunks. This is used to describe min&max bounds for parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition Returns: a list of tuples (begin, end) of indexable value that declare approximately equal-sized ranges of existing repos """ n = self.db_num_entries() partitions = [] partition_size = min(partition_size, n) prev_index = None for i in range(0, n-1, partition_size): # indexable column from the ith row index = self.db_session.query(self.MODEL.indexable) \ .order_by(self.MODEL.indexable).offset(i).first() if index is not None and prev_index is not None: partitions.append((prev_index, index)) prev_index = index partitions.append((prev_index, self.db_last_index())) return partitions def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] else: return None def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) tasks_to_disable = [repo.task_id for repo in deleted_repos if repo.task_id is not None] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None - def run(self, min_index=None, max_index=None): + def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater - than the desired max_index. + than the desired max_bound. Args: - min_index (indexable type): optional index to start from - max_index (indexable type): optional index to stop at + min_bound (indexable type): optional index to start from + max_bound (indexable type): optional index to stop at Returns: nothing """ - index = min_index or '' + index = min_bound or '' loop_count = 0 - self.min_index = min_index - self.max_index = max_index + self.min_index = min_bound + self.max_index = max_bound while self.is_within_bounds(index, self.min_index, self.max_index): logging.info('listing repos starting at %s' % index) response, injected_repos = self.ingest_data(index) next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = [k for k in injected_repos.keys()] self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if (next_index is None) or (next_index == index): logging.info('stopping after index %s, no next link found' % index) break else: index = next_index loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class SWHIndexingHttpLister(SWHListerHttpTransport, SWHIndexingLister): """Convenience class for ensuring right lookup and init order when combining SWHIndexingLister and SWHListerHttpTransport.""" def __init__(self, api_baseurl=None, override_config=None): SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl) SWHIndexingLister.__init__(self, override_config=override_config) diff --git a/swh/lister/core/paging_lister.py b/swh/lister/core/paging_lister.py index 9779975..d65d4a4 100644 --- a/swh/lister/core/paging_lister.py +++ b/swh/lister/core/paging_lister.py @@ -1,136 +1,136 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from .lister_transports import SWHListerHttpTransport from .lister_base import SWHListerBase class PageByPageLister(SWHListerBase): """Lister* intermediate class for any service that follows the simple pagination page pattern. - Client sends a request to list repositories starting from a given page identifier. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories (per page) starting from a given index. And, if available, some indication of the next page index for fetching the remaining repository data. See :class:`swh.lister.core.lister_base.SWHListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class. Then provide the required overrides in addition to any unmet implementation/override requirements of this class's base (see parent class and member docstrings for details). Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint page given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use the headers links to provide the next page. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass @abc.abstractmethod def get_pages_information(self): """Find the total number of pages. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use dedicated headers: - x-total-pages to provide the total number of pages - x-total to provide the total number of repositories - x-per-page to provide the number of elements per page Returns: tuple (total number of repositories, total number of pages, per_page) """ pass # You probably don't need to override anything below this line. - def run(self, min_index=None, max_index=None): + def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring. Continually fetching sublists until either there - is no next index reference given or the given next index is - greater than the desired max_index. + is no next page reference given or the given next page is + greater than the desired max_page. Args: - min_index (indexable type): optional index to start from - max_index (indexable type): optional index to stop at + min_bound: optional page to start from + max_bound: optional page to stop at Returns: nothing """ - index = min_index or '' + page = min_bound or '' loop_count = 0 - self.min_index = min_index - self.max_index = max_index + self.min_page = min_bound + self.max_page = max_bound - while self.is_within_bounds(index, self.min_index, self.max_index): - logging.info('listing repos starting at %s' % index) + while self.is_within_bounds(page, self.min_page, self.max_page): + logging.info('listing repos starting at %s' % page) - response, injected_repos = self.ingest_data(index) - next_index = self.get_next_target_from_response(response) + response, injected_repos = self.ingest_data(page) + next_page = self.get_next_target_from_response(response) # termination condition - if (next_index is None) or (next_index == index): - logging.info('stopping after index %s, no next link found' % - index) + if (next_page is None) or (next_page == page): + logging.info('stopping after page %s, no next link found' % + page) break else: - index = next_index + page = next_page loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class PageByPageHttpLister(SWHListerHttpTransport, PageByPageLister): """Convenience class for ensuring right lookup and init order when combining PageByPageLister and SWHListerHttpTransport. """ def __init__(self, api_baseurl=None, override_config=None): SWHListerHttpTransport.__init__(self, api_baseurl=api_baseurl) PageByPageLister.__init__(self, override_config=override_config) diff --git a/swh/lister/core/tasks.py b/swh/lister/core/tasks.py index 4373eaa..a8305e7 100644 --- a/swh/lister/core/tasks.py +++ b/swh/lister/core/tasks.py @@ -1,89 +1,89 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from celery import group from swh.scheduler.task import Task, TaskType from .abstractattribute import AbstractAttribute class AbstractTaskMeta(abc.ABCMeta, TaskType): pass class ListerTaskBase(Task, metaclass=AbstractTaskMeta): """Lister Tasks define the process of periodically requesting batches of repository information from source code hosting services. They instantiate Listers to do batches of work at periodic intervals. There are two main kinds of lister tasks: 1. Discovering new repositories. 2. Refreshing the list of already discovered repositories. If the hosting service is indexable (according to the requirements of :class:`SWHIndexingLister`), then we can optionally partition the set of known repositories into sub-sets to distribute the work. This means that there is a third possible Task type for Indexing Listers: 3. Discover or refresh a specific range of indices. """ task_queue = AbstractAttribute('Celery Task queue name') @abc.abstractmethod def new_lister(self, *args, **kwargs): """Return a new lister of the appropriate type. """ pass @abc.abstractmethod def run_task(self, *args, **kwargs): pass # Paging/Indexing lister tasks derivatives # (cf. {github/bitbucket/gitlab}/tasks) class RangeListerTask(ListerTaskBase): """Range lister task. """ def run_task(self, start, end, *args, **kwargs): lister = self.new_lister(*args, **kwargs) - return lister.run(min_index=start, max_index=end) + return lister.run(min_bound=start, max_bound=end) # Indexing Lister tasks derivatives (cf. {github/bitbucket}/tasks) class IndexingDiscoveryListerTask(ListerTaskBase): """Incremental indexing lister task. """ def run_task(self, *args, **kwargs): lister = self.new_lister(*args, **kwargs) - return lister.run(min_index=lister.db_last_index(), max_index=None) + return lister.run(min_bound=lister.db_last_index(), max_bound=None) class IndexingRefreshListerTask(ListerTaskBase): """Full indexing lister task. """ GROUP_SPLIT = 10000 def run_task(self, *args, **kwargs): lister = self.new_lister(*args, **kwargs) ranges = lister.db_partition_indices(self.GROUP_SPLIT) random.shuffle(ranges) range_task = RangeListerTask() group(range_task.s(minv, maxv, *args, **kwargs) for minv, maxv in ranges)() diff --git a/swh/lister/core/tests/test_lister.py b/swh/lister/core/tests/test_lister.py index 9d8f5e9..e4b8785 100644 --- a/swh/lister/core/tests/test_lister.py +++ b/swh/lister/core/tests/test_lister.py @@ -1,241 +1,241 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import time from unittest import TestCase from unittest.mock import Mock, patch import requests_mock from testing.postgresql import Postgresql from nose.tools import istest from sqlalchemy import create_engine from swh.lister.core.abstractattribute import AbstractAttribute def noop(*args, **kwargs): pass @requests_mock.Mocker() class HttpListerTesterBase(abc.ABC): """Base testing class for subclasses of swh.lister.core.indexing_lister.SWHIndexingHttpLister. swh.lister.core.paging_lister.PageByPageHttpLister See swh.lister.github.tests.test_gh_lister for an example of how to customize for a specific listing service. """ Lister = AbstractAttribute('The lister class to test') test_re = AbstractAttribute('Compiled regex matching the server url. Must' ' capture the index value.') lister_subdir = AbstractAttribute('bitbucket, github, etc.') good_api_response_file = AbstractAttribute('Example good response body') bad_api_response_file = AbstractAttribute('Example bad response body') first_index = AbstractAttribute('First index in good_api_response') last_index = AbstractAttribute('Last index in good_api_response') entries_per_page = AbstractAttribute('Number of results in good response') LISTER_NAME = 'fake-lister' # May need to override this if the headers are used for something def response_headers(self, request): return {} # May need to override this if the server uses non-standard rate limiting # method. # Please keep the requested retry delay reasonably low. def mock_rate_quota(self, n, request, context): self.rate_limit += 1 context.status_code = 429 context.headers['Retry-After'] = '1' return '{"error":"dummy"}' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rate_limit = 1 self.response = None self.fl = None self.helper = None if self.__class__ != HttpListerTesterBase: self.run = TestCase.run.__get__(self, self.__class__) else: self.run = noop def request_index(self, request): m = self.test_re.search(request.path_url) if m and (len(m.groups()) > 0): return m.group(1) else: return None def mock_response(self, request, context): self.fl.reset_backoff() self.rate_limit = 1 context.status_code = 200 custom_headers = self.response_headers(request) context.headers.update(custom_headers) if self.request_index(request) == str(self.first_index): with open('swh/lister/%s/tests/%s' % (self.lister_subdir, self.good_api_response_file), 'r', encoding='utf-8') as r: return r.read() else: with open('swh/lister/%s/tests/%s' % (self.lister_subdir, self.bad_api_response_file), 'r', encoding='utf-8') as r: return r.read() def mock_limit_n_response(self, n, request, context): self.fl.reset_backoff() if self.rate_limit <= n: return self.mock_rate_quota(n, request, context) else: return self.mock_response(request, context) def mock_limit_once_response(self, request, context): return self.mock_limit_n_response(1, request, context) def mock_limit_twice_response(self, request, context): return self.mock_limit_n_response(2, request, context) def get_fl(self, override_config=None): """Retrieve an instance of fake lister (fl). """ if override_config or self.fl is None: with patch( 'swh.scheduler.backend.SchedulerBackend.reconnect', noop ): self.fl = self.Lister(api_baseurl='https://fakeurl', override_config=override_config) self.fl.INITIAL_BACKOFF = 1 self.fl.reset_backoff() return self.fl def get_api_response(self): fl = self.get_fl() if self.response is None: self.response = fl.safely_issue_request(self.first_index) return self.response @istest def test_is_within_bounds(self, http_mocker): fl = self.get_fl() self.assertFalse(fl.is_within_bounds(1, 2, 3)) self.assertTrue(fl.is_within_bounds(2, 1, 3)) self.assertTrue(fl.is_within_bounds(1, 1, 1)) self.assertTrue(fl.is_within_bounds(1, None, None)) self.assertTrue(fl.is_within_bounds(1, None, 2)) self.assertTrue(fl.is_within_bounds(1, 0, None)) self.assertTrue(fl.is_within_bounds("b", "a", "c")) self.assertFalse(fl.is_within_bounds("a", "b", "c")) self.assertTrue(fl.is_within_bounds("a", None, "c")) self.assertTrue(fl.is_within_bounds("a", None, None)) self.assertTrue(fl.is_within_bounds("b", "a", None)) self.assertFalse(fl.is_within_bounds("a", "b", None)) self.assertTrue(fl.is_within_bounds("aa:02", "aa:01", "aa:03")) self.assertFalse(fl.is_within_bounds("aa:12", None, "aa:03")) with self.assertRaises(TypeError): fl.is_within_bounds(1.0, "b", None) with self.assertRaises(TypeError): fl.is_within_bounds("A:B", "A::B", None) @istest def test_api_request(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_limit_twice_response) with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: self.get_api_response() self.assertEqual(sleepmock.call_count, 2) @istest def test_repos_list(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) li = self.get_fl().transport_response_simplified( self.get_api_response() ) self.assertIsInstance(li, list) self.assertEqual(len(li), self.entries_per_page) @istest def test_model_map(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() li = fl.transport_response_simplified(self.get_api_response()) di = li[0] self.assertIsInstance(di, dict) pubs = [k for k in vars(fl.MODEL).keys() if not k.startswith('_')] for k in pubs: if k not in ['last_seen', 'task_id', 'origin_id', 'id']: self.assertIn(k, di) def disable_storage_and_scheduler(self, fl): fl.create_missing_origins_and_tasks = Mock(return_value=None) def disable_db(self, fl): fl.winnow_models = Mock(return_value=[]) fl.db_inject_repo = Mock(return_value=fl.MODEL()) fl.disable_deleted_repo_tasks = Mock(return_value=None) @istest def test_fetch_none_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_storage_and_scheduler(fl) self.disable_db(fl) - fl.run(min_index=1, max_index=1) # stores no results + fl.run(min_bound=1, max_bound=1) # stores no results @istest def test_fetch_one_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_storage_and_scheduler(fl) self.disable_db(fl) - fl.run(min_index=self.first_index, max_index=self.first_index) + fl.run(min_bound=self.first_index, max_bound=self.first_index) @istest def test_fetch_multiple_pages_nodb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) fl = self.get_fl() self.disable_storage_and_scheduler(fl) self.disable_db(fl) - fl.run(min_index=self.first_index) + fl.run(min_bound=self.first_index) def init_db(self, db, model): engine = create_engine(db.url()) model.metadata.create_all(engine) @istest def test_fetch_multiple_pages_yesdb(self, http_mocker): http_mocker.get(self.test_re, text=self.mock_response) initdb_args = Postgresql.DEFAULT_SETTINGS['initdb_args'] initdb_args = ' '.join([initdb_args, '-E UTF-8']) db = Postgresql(initdb_args=initdb_args) fl = self.get_fl(override_config={'lister_db_url': db.url()}) self.init_db(db, fl.MODEL) self.disable_storage_and_scheduler(fl) # FIXME: Separate the tests properly for the gitlab lister # did not succeed yet if not hasattr(fl, 'db_last_index'): # gitlab lister cannot pass here return - fl.run(min_index=self.first_index) + fl.run(min_bound=self.first_index) self.assertEqual(fl.db_last_index(), self.last_index) partitions = fl.db_partition_indices(5) self.assertGreater(len(partitions), 0) for k in partitions: self.assertLessEqual(len(k), 5) self.assertGreater(len(k), 0)