diff --git a/swh/lister/core/lister_transports.py b/swh/lister/core/lister_transports.py index 7f77449..a1f346c 100644 --- a/swh/lister/core/lister_transports.py +++ b/swh/lister/core/lister_transports.py @@ -1,218 +1,232 @@ # Copyright (C) 2017-2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import random from datetime import datetime from email.utils import parsedate from pprint import pformat +from xmlrpc import client import requests import xmltodict try: from swh.lister._version import __version__ except ImportError: __version__ = 'devel' from .abstractattribute import AbstractAttribute from .lister_base import FetchError class ListerXMLRPCTransport(abc.ABC): """Use the xmlrpc library for making Lister endpoint requests. To be used in conjunction with SWHListerBase or a subclass of it. """ SERVER = AbstractAttribute('string containing the server to contact for ' 'information') def __init__(self): self.lister_version = __version__ def get_client(self, path): """Initialize client to query for result """ - from xmlrpc import client return client.ServerProxy(path) - def list_packages(self, client): - """Listing method - - """ - pass - def request_uri(self, _): """Same uri called once """ return self.SERVER def request_params(self, identifier): """Cannot pass any parameters to query to the xmlrpc client so cannot even pass our user-agent specifics. """ return {} def transport_quota_check(self, response): """No rate limit dealing explained. """ return False, 0 def transport_request(self, identifier): - """Implements SWHListerBase.transport_request for HTTP using Requests. + """Implements SWHListerBase.transport_request """ path = self.request_uri(identifier) - # params = self.request_params(identifier) # we cannot use this... - try: - _client = self.get_client(path) - return self.list_packages(_client) + return self.get_client(path) except Exception as e: raise FetchError(e) def transport_response_to_string(self, response): """Implements SWHListerBase.transport_response_to_string for XMLRPC given responses. + """ s = pformat(self.SERVER) - s += '\n#\n' + pformat(response) + s += '\n#\n' + pformat(response) # Note: will potentially be big return s class SWHListerHttpTransport(abc.ABC): """Use the Requests library for making Lister endpoint requests. To be used in conjunction with SWHListerBase or a subclass of it. """ PATH_TEMPLATE = AbstractAttribute('string containing a python string' ' format pattern that produces the API' ' endpoint path for listing stored' ' repositories when given an index.' ' eg. "/repositories?after=%s".' 'To be implemented in the API-specific' ' class inheriting this.') EXPECTED_STATUS_CODES = (200, 429, 403, 404) def request_headers(self): """Returns dictionary of any request headers needed by the server. MAY BE OVERRIDDEN if request headers are needed. """ return { 'User-Agent': 'Software Heritage lister (%s)' % self.lister_version } def request_uri(self, identifier): """Get the full request URI given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the PATH_TEMPLATE is required. """ path = self.PATH_TEMPLATE % identifier return self.api_baseurl + path def request_params(self, identifier): """Get the full parameters passed to requests given the transport_request identifier. MAY BE OVERRIDDEN if something more complex than the request headers is needed. """ params = {} params['headers'] = self.request_headers() or {} creds = self.config['credentials'] auth = random.choice(creds) if creds else None if auth: params['auth'] = (auth['username'], auth['password']) return params def transport_quota_check(self, response): """Implements SWHListerBase.transport_quota_check with standard 429 code check for HTTP with Requests library. MAY BE OVERRIDDEN if the server notifies about rate limits in a non-standard way that doesn't use HTTP 429 and the Retry-After response header. ( https://tools.ietf.org/html/rfc6585#section-4 ) """ if response.status_code == 429: # HTTP too many requests retry_after = response.headers.get('Retry-After', self.back_off()) try: # might be seconds return True, float(retry_after) except Exception: # might be http-date at_date = datetime(*parsedate(retry_after)[:6]) from_now = (at_date - datetime.today()).total_seconds() + 5 return True, max(0, from_now) else: # response ok self.reset_backoff() return False, 0 def __init__(self, api_baseurl=None): if not api_baseurl: raise NameError('HTTP Lister Transport requires api_baseurl.') self.api_baseurl = api_baseurl # eg. 'https://api.github.com' self.session = requests.Session() self.lister_version = __version__ def _transport_action(self, identifier, method='get'): """Permit to ask information to the api prior to actually executing query. """ path = self.request_uri(identifier) params = self.request_params(identifier) try: if method == 'head': response = self.session.head(path, **params) else: response = self.session.get(path, **params) except requests.exceptions.ConnectionError as e: raise FetchError(e) else: if response.status_code not in self.EXPECTED_STATUS_CODES: raise FetchError(response) return response def transport_head(self, identifier): """Retrieve head information on api. """ return self._transport_action(identifier, method='head') def transport_request(self, identifier): """Implements SWHListerBase.transport_request for HTTP using Requests. Retrieve get information on api. """ return self._transport_action(identifier) def transport_response_to_string(self, response): """Implements SWHListerBase.transport_response_to_string for HTTP given Requests responses. """ s = pformat(response.request.path_url) s += '\n#\n' + pformat(response.request.headers) s += '\n#\n' + pformat(response.status_code) s += '\n#\n' + pformat(response.headers) s += '\n#\n' try: # json? s += pformat(response.json()) except Exception: # not json try: # xml? s += pformat(xmltodict.parse(response.text)) except Exception: # not xml s += pformat(response.text) return s + + +class ListerOnePageApiTransport(SWHListerHttpTransport): + """Use the request library for retrieving a basic html page and parse + the result. + + To be used in conjunction with SWHListerBase or a subclass of it. + + """ + PAGE = AbstractAttribute("The server api's unique page to retrieve and " + "parse for information") + PATH_TEMPLATE = None # we do not use it + + def __init__(self, api_baseurl=None): + self.session = requests.Session() + self.lister_version = __version__ + + def request_uri(self, _): + """Get the full request URI given the transport_request identifier. + + """ + return self.PAGE diff --git a/swh/lister/core/simple_lister.py b/swh/lister/core/simple_lister.py index 42b707c..c9e9d6b 100644 --- a/swh/lister/core/simple_lister.py +++ b/swh/lister/core/simple_lister.py @@ -1,67 +1,76 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import abc import logging from .lister_base import SWHListerBase class SimpleLister(SWHListerBase): """Lister* intermediate class for any service that follows the simple, 'list in oneshot information' pattern. - Client sends a request to list repositories in oneshot - Client receives structured (json/xml/etc) response with information and stores those in db """ + @abc.abstractmethod + def list_packages(self, *args): + """Listing packages method. + + """ + pass + def ingest_data(self, identifier, checks=False): """Rework the base ingest_data. Request server endpoint which gives all in one go. Simplify and filter response list of repositories. Inject repo information into local db. Queue loader tasks for linked repositories. Args: identifier: Resource identifier (unused) checks (bool): Additional checks required (unused) """ # Request (partial?) list of repositories info response = self.safely_issue_request(identifier) + response = self.list_packages(response) if not response: return response, [] models_list = self.transport_response_simplified(response) models_list = self.filter_before_inject(models_list) from swh.core import utils all_injected = [] for models in utils.grouper(models_list, n=1000): models = list(models) logging.debug('models: %s' % len(models)) # inject into local db injected = self.inject_repo_data_into_db(models) # queue workers self.create_missing_origins_and_tasks(models, injected) all_injected.append(injected) # flush self.db_session.commit() self.db_session = self.mk_session() return response, all_injected def run(self): """Query the server which answers in one query. Stores the information, dropping actual redundant information we already have. Returns: nothing """ dump_not_used_identifier = 0 response, injected_repos = self.ingest_data(dump_not_used_identifier) if not response and not injected_repos: logging.info('No response from api server, stopping') diff --git a/swh/lister/pypi/lister.py b/swh/lister/pypi/lister.py index 857b951..294f0b4 100644 --- a/swh/lister/pypi/lister.py +++ b/swh/lister/pypi/lister.py @@ -1,70 +1,74 @@ # Copyright (C) 2018 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import xmltodict + from .models import PyPiModel from swh.scheduler import utils from swh.lister.core.simple_lister import SimpleLister -from swh.lister.core.lister_transports import ListerXMLRPCTransport +from swh.lister.core.lister_transports import ListerOnePageApiTransport -class PyPiLister(ListerXMLRPCTransport, SimpleLister): +class PyPiLister(ListerOnePageApiTransport, SimpleLister): # Template path expecting an integer that represents the page id MODEL = PyPiModel LISTER_NAME = 'pypi' - SERVER = 'https://pypi.org/pypi' + PAGE = 'https://pypi.org/simple/' def __init__(self, override_config=None): - ListerXMLRPCTransport.__init__(self) + ListerOnePageApiTransport .__init__(self) SimpleLister.__init__(self, override_config=override_config) def task_dict(self, origin_type, origin_url, **kwargs): """(Override) Return task format dict This is overridden from the lister_base as more information is needed for the ingestion task creation. """ _type = 'origin-update-%s' % origin_type _policy = 'recurring' project_metadata_url = kwargs.get('html_url') return utils.create_task_dict( _type, _policy, origin_url, project_metadata_url=project_metadata_url) - def list_packages(self, client): - """(Override) List the actual pypi origins from the api. + def list_packages(self, response): + """(Override) List the actual pypi origins from the response. """ - return client.list_packages() + result = xmltodict.parse(response.content) + _all = result['html']['body']['a'] + return [package['#text'] for package in _all] def _compute_urls(self, repo_name): """Returns a tuple (project_url, project_metadata_url) """ return ( 'https://pypi.org/pypi/%s/' % repo_name, 'https://pypi.org/pypi/%s/json' % repo_name ) def get_model_from_repo(self, repo_name): """(Override) Transform from repository representation to model """ project_url, project_url_meta = self._compute_urls(repo_name) return { 'uid': repo_name, 'name': repo_name, 'full_name': repo_name, 'html_url': project_url_meta, 'origin_url': project_url, 'origin_type': 'pypi', 'description': None, } def transport_response_simplified(self, response): """(Override) Transform response to list for model manipulation """ return [self.get_model_from_repo(repo_name) for repo_name in response]