Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/pypi/lister.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | |||||
import random | import random | ||||
from typing import Any, Dict | import time | ||||
from typing import Any, Dict, Iterator, List | |||||
from requests import Response | import requests | ||||
import xmltodict | import xmltodict | ||||
from swh.lister.core.lister_transports import ListerOnePageApiTransport | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.lister.core.simple_lister import SimpleLister | from swh.scheduler.model import ListedOrigin | ||||
from swh.scheduler import utils | |||||
from .models import PyPIModel | from .. import USER_AGENT | ||||
from ..pattern import StatelessLister | |||||
logger = logging.getLogger(__name__) | |||||
class PyPILister(ListerOnePageApiTransport, SimpleLister): | PyPIPageType = List[Dict[str, Any]] | ||||
MODEL = PyPIModel | |||||
LISTER_NAME = "pypi" | |||||
PAGE = "https://pypi.org/simple/" | |||||
instance = "pypi" # As of today only the main pypi.org is used | |||||
def __init__(self, override_config=None): | |||||
ListerOnePageApiTransport.__init__(self) | |||||
SimpleLister.__init__(self, override_config=override_config) | |||||
def task_dict(self, origin_type: str, origin_url: str, **kwargs): | |||||
"""(Override) Return task format dict | |||||
This is overridden from the lister_base as more information is | class PyPILister(StatelessLister[PyPIPageType]): | ||||
needed for the ingestion task creation. | """List origins from PyPI. | ||||
""" | """ | ||||
_type = "load-%s" % origin_type | |||||
_policy = kwargs.get("policy", "recurring") | |||||
return utils.create_task_dict(_type, _policy, url=origin_url) | |||||
def list_packages(self, response: Response) -> list: | |||||
"""(Override) List the actual pypi origins from the response. | |||||
olasd: This lister makes a single request, so I don't think the retry/rate limit logic is really… | |||||
Done Inline ActionsThanks, was not sure about that, so I made the conservative choice of doing because it was implicitly handled before and may happen in production. Will remove. tenma: Thanks, was not sure about that, so I made the conservative choice of doing because it was… | |||||
""" | LISTER_NAME = "pypi" | ||||
result = xmltodict.parse(response.content) | INSTANCE = "pypi" # As of today only the main pypi.org is used | ||||
_packages = [p["#text"] for p in result["html"]["body"]["a"]] | |||||
random.shuffle(_packages) | |||||
return _packages | |||||
def origin_url(self, repo_name: str) -> str: | PACKAGE_LIST_URL = "https://pypi.org/simple/" | ||||
"""Returns origin_url | PACKAGE_URL = "https://pypi.org/project/{package_name}/" | ||||
""" | BACKOFF_FACTOR = 10 | ||||
return "https://pypi.org/project/%s/" % repo_name | MAX_RETRIES = 5 | ||||
def get_model_from_repo(self, repo_name: str) -> Dict[str, Any]: | def __init__(self, scheduler: SchedulerInterface): | ||||
"""(Override) Transform from repository representation to model | super().__init__( | ||||
scheduler=scheduler, | |||||
credentials=None, | |||||
url=self.PACKAGE_LIST_URL, | |||||
instance=self.INSTANCE, | |||||
) | |||||
self.session = requests.Session() | |||||
self.session.headers.update( | |||||
{"Accept": "application/html", "User-Agent": USER_AGENT} | |||||
) | |||||
def get_pages(self) -> Iterator[PyPIPageType]: | |||||
backoff = self.BACKOFF_FACTOR | |||||
for request_count in range(self.MAX_RETRIES): | |||||
response = self.session.get(self.PACKAGE_LIST_URL) | |||||
# handle rate-limiting | |||||
if response.status_code == 429: | |||||
logger.info("Rate limit was hit, sleeping %ss", backoff) | |||||
time.sleep(backoff) | |||||
backoff *= self.BACKOFF_FACTOR | |||||
else: | |||||
# not rate-limited | |||||
break | |||||
else: | |||||
logger.info("Max number of attempts hit (%s), giving up", self.MAX_RETRIES) | |||||
return | |||||
# log other HTTP errors | |||||
if response.status_code != 200: | |||||
logger.warning( | |||||
"Got unexpected status_code %s: %s", | |||||
response.status_code, | |||||
response.content, | |||||
) | |||||
return | |||||
""" | result = xmltodict.parse(response.content) | ||||
origin_url = self.origin_url(repo_name) | packages_name = [p["#text"] for p in result["html"]["body"]["a"]] | ||||
Done Inline ActionsThis shuffle isn't needed anymore: we can just insert all origins in order. olasd: This shuffle isn't needed anymore: we can just insert all origins in order. | |||||
return { | random.shuffle(packages_name) | ||||
"uid": origin_url, | yield packages_name | ||||
"name": repo_name, | |||||
"full_name": repo_name, | def get_origins_from_page( | ||||
"html_url": origin_url, | self, packages_name: PyPIPageType | ||||
"origin_url": origin_url, | ) -> Iterator[ListedOrigin]: | ||||
"origin_type": "pypi", | """Convert a page of PyPI repositories into a list of ListedOrigins.""" | ||||
} | assert self.lister_obj.id is not None | ||||
for package_name in packages_name: | |||||
package_url = self.PACKAGE_URL.format(package_name=package_name) | |||||
yield ListedOrigin( | |||||
lister_id=self.lister_obj.id, | |||||
url=package_url, | |||||
visit_type="pypi", | |||||
last_update=None, # available but costs +1 request per project | |||||
) | |||||
Done Inline Actionsavailable on PyPI JSON API anlambert: available on PyPI JSON API |
This lister makes a single request, so I don't think the retry/rate limit logic is really needed. We can just restart the lister if it fails.