diff --git a/swh/lister/core/indexing_lister.py b/swh/lister/core/indexing_lister.py index 21d4a82..834d5f9 100644 --- a/swh/lister/core/indexing_lister.py +++ b/swh/lister/core/indexing_lister.py @@ -1,249 +1,249 @@ # Copyright (C) 2015-2019 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from itertools import count import dateutil from sqlalchemy import func from .lister_transports import ListerHttpTransport from .lister_base import ListerBase logger = logging.getLogger(__name__) class IndexingLister(ListerBase): """Lister* intermediate class for any service that follows the pattern: - The service must report at least one stable unique identifier, known herein as the UID value, for every listed repository. - If the service splits the list of repositories into sublists, it must report at least one stable and sorted index identifier for every listed repository, known herein as the indexable value, which can be used as part of the service endpoint query to request a sublist beginning from that index. This might be the UID if the UID is monotonic. - Client sends a request to list repositories starting from a given index. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories starting from that index and, if necessary/available, some indication of the URL or index for fetching the next series of repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class and provide the required overrides in addition to any unmet implementation/override requirements of this class's base. (see parent class and member docstrings for details) Required Overrides:: def get_next_target_from_response """ flush_packet_db = 20 """Number of iterations in-between write flushes of lister repositories to db (see fn:`run`). """ default_min_bound = '' """Default initialization value for the minimum boundary index to use when undefined (see fn:`run`). """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint identifier given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass # You probably don't need to override anything below this line. def filter_before_inject(self, models_list): """Overrides ListerBase.filter_before_inject Bounds query results by this Lister's set max_index. """ models_list = [ m for m in models_list if self.is_within_bounds(m['indexable'], None, self.max_index) ] return models_list def db_query_range(self, start, end): """Look in the db for a range of repositories with indexable values in the range [start, end] Args: start (model indexable type): start of desired indexable range end (model indexable type): end of desired indexable range Returns: a list of sqlalchemy.ext.declarative.declarative_base objects with indexable values within the given range """ retlist = self.db_session.query(self.MODEL) if start is not None: retlist = retlist.filter(self.MODEL.indexable >= start) if end is not None: retlist = retlist.filter(self.MODEL.indexable <= end) return retlist def db_partition_indices(self, partition_size): """Describe an index-space compartmentalization of the db table in equal sized chunks. This is used to describe min&max bounds for parallelizing fetch tasks. Args: partition_size (int): desired size to make each partition Returns: a list of tuples (begin, end) of indexable value that declare approximately equal-sized ranges of existing repos """ n = max(self.db_num_entries(), 10) partition_size = min(partition_size, n) n_partitions = n // partition_size min_index = self.db_first_index() max_index = self.db_last_index() if not min_index or not max_index: # Nothing to list return [] if isinstance(min_index, str): def format_bound(bound): return bound.isoformat() min_index = dateutil.parser.parse(min_index) max_index = dateutil.parser.parse(max_index) else: def format_bound(bound): return bound partition_width = (max_index - min_index) / n_partitions partitions = [ [ format_bound(min_index + i * partition_width), format_bound(min_index + (i+1) * partition_width), ] for i in range(n_partitions) ] # Remove bounds for lowest and highest partition partitions[0][0] = None partitions[-1][1] = None return [tuple(partition) for partition in partitions] def db_first_index(self): """Look in the db for the smallest indexable value Returns: the smallest indexable value of all repos in the db """ t = self.db_session.query(func.min(self.MODEL.indexable)).first() if t: return t[0] def db_last_index(self): """Look in the db for the largest indexable value Returns: the largest indexable value of all repos in the db """ t = self.db_session.query(func.max(self.MODEL.indexable)).first() if t: return t[0] def disable_deleted_repo_tasks(self, start, end, keep_these): """Disable tasks for repos that no longer exist between start and end. Args: start: beginning of range to disable end: end of range to disable keep_these (uid list): do not disable repos with uids in this list """ if end is None: end = self.db_last_index() if not self.is_within_bounds(end, None, self.max_index): end = self.max_index deleted_repos = self.winnow_models( self.db_query_range(start, end), self.MODEL.uid, keep_these ) tasks_to_disable = [repo.task_id for repo in deleted_repos if repo.task_id is not None] if tasks_to_disable: self.scheduler.disable_tasks(tasks_to_disable) for repo in deleted_repos: repo.task_id = None def run(self, min_bound=None, max_bound=None): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring, continually fetching sublists until either there is no next index reference given or the given next index is greater than the desired max_bound. Args: min_bound (indexable type): optional index to start from max_bound (indexable type): optional index to stop at Returns: nothing """ self.min_index = min_bound self.max_index = max_bound def ingest_indexes(): index = min_bound or self.default_min_bound for i in count(1): response, injected_repos = self.ingest_data(index) if not response and not injected_repos: logger.info('No response from api server, stopping') return next_index = self.get_next_target_from_response(response) # Determine if any repos were deleted, and disable their tasks. keep_these = list(injected_repos.keys()) self.disable_deleted_repo_tasks(index, next_index, keep_these) # termination condition if next_index is None or next_index == index: logger.info('stopping after index %s, no next link found', index) return index = next_index logger.debug('Index: %s', index) yield i for i in ingest_indexes(): if (i % self.flush_packet_db) == 0: logger.debug('Flushing updates at index %s', i) self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class IndexingHttpLister(ListerHttpTransport, IndexingLister): """Convenience class for ensuring right lookup and init order when combining IndexingLister and ListerHttpTransport.""" def __init__(self, api_baseurl=None, override_config=None): - ListerHttpTransport.__init__(self, api_baseurl=api_baseurl) IndexingLister.__init__(self, override_config=override_config) + ListerHttpTransport.__init__(self, api_baseurl=api_baseurl) diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index 2bacb6c..ff0827c 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,225 +1,229 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat import logging import requests import xmltodict try: from swh.lister._version import __version__ except ImportError: __version__ = 'devel' from .abstractattribute import AbstractAttribute from .lister_base import FetchError logger = logging.getLogger(__name__) class ListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with ListerBase or a subclass of it. """ - + DEFAULT_URL = None PATH_TEMPLATE = AbstractAttribute('string containing a python string' ' format pattern that produces the API' ' endpoint path for listing stored' ' repositories when given an index.' ' eg. "/repositories?after=%s".' 'To be implemented in the API-specific' ' class inheriting this.') EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version } def request_instance_credentials(self): """Returns dictionary of any credentials configuration needed by the forge instance to list. The 'credentials' configuration is expected to be a dict of multiple levels. The first level is the lister's name, the second is the lister's instance name, which value is expected to be a list of credential structures (typically a couple username/password). For example: credentials: github: # github lister github: # has only one instance (so far) - username: some password: somekey - username: one password: onekey - ... gitlab: # gitlab lister riseup: # has many instances - username: someone password: ... - ... gitlab: - username: someone password: ... - ... Returns: list of credential dicts for the current lister. """ all_creds = self.config.get('credentials') if not all_creds: return [] lister_creds = all_creds.get(self.LISTER_NAME, {}) creds = lister_creds.get(self.instance, []) return creds def request_uri(self, identifier): """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier return self.api_baseurl + path def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. This uses credentials if any are provided (see request_instance_credentials). MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} creds = self.request_instance_credentials() if not creds: return params auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements ListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, api_baseurl=None): + if not api_baseurl: + api_baseurl = self.config.get('api_baseurl') + if not api_baseurl: + api_baseurl = self.DEFAULT_URL if not api_baseurl: raise NameError('HTTP Lister Transport requires api_baseurl.') self.api_baseurl = api_baseurl # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ def _transport_action(self, identifier, method='get'): """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) try: if method == 'head': response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: logger.warning('Failed to fetch %s: %s', path, e) raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier): """Retrieve head information on api. """ return self._transport_action(identifier, method='head') def transport_request(self, identifier): """Implements ListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response): """Implements ListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s class ListerOnePageApiTransport(ListerHttpTransport): """Leverage requests library to retrieve basic html page and parse result. To be used in conjunction with ListerBase or a subclass of it. """ PAGE = AbstractAttribute("The server api's unique page to retrieve and " "parse for information") PATH_TEMPLATE = None # we do not use it def __init__(self, api_baseurl=None): self.session = requests.Session() self.lister_version = __version__ def request_uri(self, _): """Get the full request URI given the transport_request identifier. """ return self.PAGE diff --git a/swh/lister/core/page_by_page_lister.py b/swh/lister/core/page_by_page_lister.py index f05b3a5..4895068 100644 --- a/swh/lister/core/page_by_page_lister.py +++ b/swh/lister/core/page_by_page_lister.py @@ -1,160 +1,160 @@ # Copyright (C) 2015-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import logging from .lister_transports import ListerHttpTransport from .lister_base import ListerBase class PageByPageLister(ListerBase): """Lister* intermediate class for any service that follows the simple pagination page pattern. - Client sends a request to list repositories starting from a given page identifier. - Client receives structured (json/xml/etc) response with information about a sequential series of repositories (per page) starting from a given index. And, if available, some indication of the next page index for fetching the remaining repository data. See :class:`swh.lister.core.lister_base.ListerBase` for more details. This class cannot be instantiated. To create a new Lister for a source code listing service that follows the model described above, you must subclass this class. Then provide the required overrides in addition to any unmet implementation/override requirements of this class's base (see parent class and member docstrings for details). Required Overrides:: def get_next_target_from_response """ @abc.abstractmethod def get_next_target_from_response(self, response): """Find the next server endpoint page given the entire response. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use the headers links to provide the next page. Args: response (transport response): response page from the server Returns: index of next page, possibly extracted from a next href url """ pass @abc.abstractmethod def get_pages_information(self): """Find the total number of pages. Implementation of this method depends on the server API spec and the shape of the network response object returned by the transport_request method. For example, some api can use dedicated headers: - x-total-pages to provide the total number of pages - x-total to provide the total number of repositories - x-per-page to provide the number of elements per page Returns: tuple (total number of repositories, total number of pages, per_page) """ pass # You probably don't need to override anything below this line. def do_additional_checks(self, models_list): """Potentially check for existence of repositories in models_list. This will be called only if check_existence is flipped on in the run method below. """ for m in models_list: sql_repo = self.db_query_equal('uid', m['uid']) if sql_repo: return False return models_list def run(self, min_bound=None, max_bound=None, check_existence=False): """Main entry function. Sequentially fetches repository data from the service according to the basic outline in the class docstring. Continually fetching sublists until either there is no next page reference given or the given next page is greater than the desired max_page. Args: min_bound: optional page to start from max_bound: optional page to stop at check_existence (bool): optional existence check (for incremental lister whose sort order is inverted) Returns: nothing """ page = min_bound or 0 loop_count = 0 self.min_page = min_bound self.max_page = max_bound while self.is_within_bounds(page, self.min_page, self.max_page): logging.info('listing repos starting at %s' % page) response, injected_repos = self.ingest_data(page, checks=check_existence) if not response and not injected_repos: logging.info('No response from api server, stopping') break elif not injected_repos: logging.info('Repositories already seen, stopping') break next_page = self.get_next_target_from_response(response) # termination condition if (next_page is None) or (next_page == page): logging.info('stopping after page %s, no next link found' % page) break else: page = next_page loop_count += 1 if loop_count == 20: logging.info('flushing updates') loop_count = 0 self.db_session.commit() self.db_session = self.mk_session() self.db_session.commit() self.db_session = self.mk_session() class PageByPageHttpLister(ListerHttpTransport, PageByPageLister): """Convenience class for ensuring right lookup and init order when combining PageByPageLister and ListerHttpTransport. """ def __init__(self, api_baseurl=None, override_config=None): - ListerHttpTransport.__init__(self, api_baseurl=api_baseurl) PageByPageLister.__init__(self, override_config=override_config) + ListerHttpTransport.__init__(self, api_baseurl=api_baseurl)