Changeset View
Standalone View
swh/lister/pypi/lister.py
# Copyright (C) 2018-2021 The Software Heritage developers | # Copyright (C) 2018-2021 The Software Heritage developers | ||||||||||||||||||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||||||||||||||||||||
from collections import defaultdict | |||||||||||||||||||||||||||||||||||
from dataclasses import asdict, dataclass | |||||||||||||||||||||||||||||||||||
from datetime import datetime, timezone | |||||||||||||||||||||||||||||||||||
import logging | import logging | ||||||||||||||||||||||||||||||||||
from typing import Iterator, List, Optional | from time import sleep | ||||||||||||||||||||||||||||||||||
from typing import Any, Dict, Iterator, List, Optional, Tuple | |||||||||||||||||||||||||||||||||||
from xmlrpc.client import Fault, ServerProxy | |||||||||||||||||||||||||||||||||||
from bs4 import BeautifulSoup | from tenacity.before_sleep import before_sleep_log | ||||||||||||||||||||||||||||||||||
import requests | |||||||||||||||||||||||||||||||||||
from swh.lister.utils import retry_attempt, throttling_retry | |||||||||||||||||||||||||||||||||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||||||||||||||||||||||||||||||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||||||||||||||||||||||||||||||||
from .. import USER_AGENT | from ..pattern import CredentialsType, Lister | ||||||||||||||||||||||||||||||||||
from ..pattern import CredentialsType, StatelessLister | |||||||||||||||||||||||||||||||||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||||
PackageListPage = List[str] | # Type returned by the XML-RPC changelog call: | ||||||||||||||||||||||||||||||||||
# package, version, release timestamp, description, serial | |||||||||||||||||||||||||||||||||||
ChangelogEntry = Tuple[str, str, int, str, int] | |||||||||||||||||||||||||||||||||||
# Manipulated package updated type which is a subset information | |||||||||||||||||||||||||||||||||||
# of the ChangelogEntry type: package, max release date | |||||||||||||||||||||||||||||||||||
PackageUpdate = Tuple[str, datetime] | |||||||||||||||||||||||||||||||||||
# Type returned by listing a page of results | |||||||||||||||||||||||||||||||||||
PackageListPage = List[PackageUpdate] | |||||||||||||||||||||||||||||||||||
class PyPILister(StatelessLister[PackageListPage]): | @dataclass | ||||||||||||||||||||||||||||||||||
class PyPIListerState: | |||||||||||||||||||||||||||||||||||
"""State of PyPI lister""" | |||||||||||||||||||||||||||||||||||
last_serial: Optional[int] = None | |||||||||||||||||||||||||||||||||||
anlambert: since the pypi instance was visited | |||||||||||||||||||||||||||||||||||
"""Last seen serial when visiting the pypi instance""" | |||||||||||||||||||||||||||||||||||
def _if_rate_limited(retry_state) -> bool: | |||||||||||||||||||||||||||||||||||
"""Custom tenacity retry predicate to handle xmlrpc client error: | |||||||||||||||||||||||||||||||||||
.. code:: | |||||||||||||||||||||||||||||||||||
xmlrpc.client.Fault: <Fault -32500: 'HTTPTooManyRequests: The action could not | |||||||||||||||||||||||||||||||||||
be performed because there were too many requests by the client. Limit may reset | |||||||||||||||||||||||||||||||||||
in 1 seconds.'> | |||||||||||||||||||||||||||||||||||
""" | |||||||||||||||||||||||||||||||||||
attempt = retry_attempt(retry_state) | |||||||||||||||||||||||||||||||||||
return attempt.failed and isinstance(attempt.exception(), Fault) | |||||||||||||||||||||||||||||||||||
def pypi_url(package_name: str) -> str: | |||||||||||||||||||||||||||||||||||
"""Build pypi url out of a package name. | |||||||||||||||||||||||||||||||||||
""" | |||||||||||||||||||||||||||||||||||
return PyPILister.PACKAGE_URL.format(package_name=package_name) | |||||||||||||||||||||||||||||||||||
class PyPILister(Lister[PyPIListerState, PackageListPage]): | |||||||||||||||||||||||||||||||||||
"""List origins from PyPI. | """List origins from PyPI. | ||||||||||||||||||||||||||||||||||
""" | """ | ||||||||||||||||||||||||||||||||||
LISTER_NAME = "pypi" | LISTER_NAME = "pypi" | ||||||||||||||||||||||||||||||||||
INSTANCE = "pypi" # As of today only the main pypi.org is used | INSTANCE = "pypi" # As of today only the main pypi.org is used | ||||||||||||||||||||||||||||||||||
PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url | |||||||||||||||||||||||||||||||||||
PACKAGE_LIST_URL = "https://pypi.org/simple/" | |||||||||||||||||||||||||||||||||||
PACKAGE_URL = "https://pypi.org/project/{package_name}/" | PACKAGE_URL = "https://pypi.org/project/{package_name}/" | ||||||||||||||||||||||||||||||||||
def __init__( | def __init__( | ||||||||||||||||||||||||||||||||||
self, | self, | ||||||||||||||||||||||||||||||||||
scheduler: SchedulerInterface, | scheduler: SchedulerInterface, | ||||||||||||||||||||||||||||||||||
credentials: Optional[CredentialsType] = None, | credentials: Optional[CredentialsType] = None, | ||||||||||||||||||||||||||||||||||
): | ): | ||||||||||||||||||||||||||||||||||
super().__init__( | super().__init__( | ||||||||||||||||||||||||||||||||||
scheduler=scheduler, | scheduler=scheduler, | ||||||||||||||||||||||||||||||||||
url=self.PACKAGE_LIST_URL, | url=self.PACKAGE_LIST_URL, | ||||||||||||||||||||||||||||||||||
instance=self.INSTANCE, | instance=self.INSTANCE, | ||||||||||||||||||||||||||||||||||
credentials=credentials, | credentials=credentials, | ||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||
self.session = requests.Session() | # used as termination condition and if useful, becomes the new state when the | ||||||||||||||||||||||||||||||||||
self.session.headers.update( | # visit is done | ||||||||||||||||||||||||||||||||||
Done Inline Actionsmaybe self.last_processed_serial? olasd: maybe `self.last_processed_serial`? | |||||||||||||||||||||||||||||||||||
Done Inline Actions+1 ardumont: +1 | |||||||||||||||||||||||||||||||||||
{"Accept": "application/html", "User-Agent": USER_AGENT} | self.last_processed_serial: Optional[int] = None | ||||||||||||||||||||||||||||||||||
def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState: | |||||||||||||||||||||||||||||||||||
return PyPIListerState(last_serial=d.get("last_serial")) | |||||||||||||||||||||||||||||||||||
def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]: | |||||||||||||||||||||||||||||||||||
return asdict(state) | |||||||||||||||||||||||||||||||||||
@throttling_retry( | |||||||||||||||||||||||||||||||||||
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) | |||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||
def _changelog_last_serial(self, client: ServerProxy) -> int: | |||||||||||||||||||||||||||||||||||
"""Internal detail to allow throttling when calling the changelog last entry""" | |||||||||||||||||||||||||||||||||||
serial = client.changelog_last_serial() | |||||||||||||||||||||||||||||||||||
assert isinstance(serial, int) | |||||||||||||||||||||||||||||||||||
return serial | |||||||||||||||||||||||||||||||||||
Done Inline ActionsYou could use a dataclass PackageUpdate instead of a tuple here for better readability. anlambert: You could use a dataclass `PackageUpdate` instead of a tuple here for better readability. | |||||||||||||||||||||||||||||||||||
Done Inline Actionstotally, thx ;) ardumont: totally, thx ;) | |||||||||||||||||||||||||||||||||||
@throttling_retry( | |||||||||||||||||||||||||||||||||||
Done Inline ActionsExecute the listing of the last updates since last_visit_timestamp anlambert: Execute the listing of the last updates since last_visit_timestamp | |||||||||||||||||||||||||||||||||||
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) | |||||||||||||||||||||||||||||||||||
) | |||||||||||||||||||||||||||||||||||
Done Inline Actions@olasd to explicit ^ when mocking the ServerProxy, here is the issue with whatever methods i'd like to mock ("the *annoying* implementation detail) AttributeError: <class 'xmlrpc.client.ServerProxy'> does not have the attribute 'last_serial' AttributeError: <class 'xmlrpc.client.ServerProxy'> does not have the attribute 'changelog_since_serial' ardumont: @olasd to explicit ^ when mocking the ServerProxy, here is the issue with whatever methods i'd… | |||||||||||||||||||||||||||||||||||
Done Inline ActionsMy suggestion would be to substitute xmlrpc.client.ServerProxy() with a (stubbed) class implementing just these two methods with hardcoded return values. I don't know how Mock() or MagicMock() behaves on classes which have dynamically generated attributes/methods, like the ones xmlrpc.client generates. olasd: My suggestion would be to substitute `xmlrpc.client.ServerProxy()` with a (stubbed) class… | |||||||||||||||||||||||||||||||||||
Done Inline ActionsI actually don't know how to do what you suggest, i'll check. ardumont: I actually don't know how to do what you suggest, i'll check.
Thanks. | |||||||||||||||||||||||||||||||||||
Done Inline Actionsreading it more slowly, i think i got it. ardumont: reading it more slowly, i think i got it. | |||||||||||||||||||||||||||||||||||
def _changelog_since_serial( | |||||||||||||||||||||||||||||||||||
self, client: ServerProxy, serial: int | |||||||||||||||||||||||||||||||||||
) -> List[ChangelogEntry]: | |||||||||||||||||||||||||||||||||||
"""Internal detail to allow throttling when calling the changelog listing""" | |||||||||||||||||||||||||||||||||||
sleep(1) # to avoid the initial warning about throttling | |||||||||||||||||||||||||||||||||||
Done Inline ActionsThe last visit timestamp anlambert: The last visit timestamp | |||||||||||||||||||||||||||||||||||
return client.changelog_since_serial(serial) # type: ignore | |||||||||||||||||||||||||||||||||||
def get_pages(self) -> Iterator[PackageListPage]: | def get_pages(self) -> Iterator[PackageListPage]: | ||||||||||||||||||||||||||||||||||
"""Iterate other changelog events per package, determine the max release date for that | |||||||||||||||||||||||||||||||||||
package and use that max release date as last_update. When the execution is | |||||||||||||||||||||||||||||||||||
done, this will also set the self.last_processed_serial attribute so we can | |||||||||||||||||||||||||||||||||||
finalize the state of the lister for the next visit. | |||||||||||||||||||||||||||||||||||
response = self.session.get(self.PACKAGE_LIST_URL) | Yields: | ||||||||||||||||||||||||||||||||||
List of Tuple of (package-name, max release-date) | |||||||||||||||||||||||||||||||||||
Done Inline ActionsIs is not client.changelog instead here ? anlambert: Is is not `client.changelog` instead here ? | |||||||||||||||||||||||||||||||||||
Done Inline Actionstotally, here falls apart the mocking part ;) ardumont: totally, here falls apart the mocking part ;)
nice catch! | |||||||||||||||||||||||||||||||||||
response.raise_for_status() | """ | ||||||||||||||||||||||||||||||||||
client = ServerProxy(self.url) | |||||||||||||||||||||||||||||||||||
page = BeautifulSoup(response.content, features="html.parser") | last_processed_serial = -1 | ||||||||||||||||||||||||||||||||||
if self.state.last_serial is not None: | |||||||||||||||||||||||||||||||||||
last_processed_serial = self.state.last_serial | |||||||||||||||||||||||||||||||||||
upstream_last_serial = self._changelog_last_serial(client) | |||||||||||||||||||||||||||||||||||
# Paginate through result of pypi, until we read everything | |||||||||||||||||||||||||||||||||||
Done Inline Actions
I'm a bit confused by these three variables. It seems that last_serial is never reused, so here's my proposal! olasd: I'm a bit confused by these three variables. It seems that `last_serial` is never reused, so… | |||||||||||||||||||||||||||||||||||
Done Inline Actionsthx, +1 again ardumont: thx, +1 again | |||||||||||||||||||||||||||||||||||
while last_processed_serial < upstream_last_serial: | |||||||||||||||||||||||||||||||||||
updated_packages = defaultdict(list) | |||||||||||||||||||||||||||||||||||
page_results = [p.text for p in page.find_all("a")] | for package, _, release_date, _, serial in self._changelog_since_serial( | ||||||||||||||||||||||||||||||||||
client, last_processed_serial | |||||||||||||||||||||||||||||||||||
): | |||||||||||||||||||||||||||||||||||
updated_packages[package].append(release_date) | |||||||||||||||||||||||||||||||||||
# Compute the max serial so we can stop when done | |||||||||||||||||||||||||||||||||||
last_processed_serial = max(last_processed_serial, serial) | |||||||||||||||||||||||||||||||||||
# Returns pages of result to flush regularly | |||||||||||||||||||||||||||||||||||
yield [ | |||||||||||||||||||||||||||||||||||
( | |||||||||||||||||||||||||||||||||||
pypi_url(package), | |||||||||||||||||||||||||||||||||||
datetime.fromtimestamp(max(release_dates)).replace( | |||||||||||||||||||||||||||||||||||
tzinfo=timezone.utc | |||||||||||||||||||||||||||||||||||
), | |||||||||||||||||||||||||||||||||||
) | |||||||||||||||||||||||||||||||||||
for package, release_dates in updated_packages.items() | |||||||||||||||||||||||||||||||||||
] | |||||||||||||||||||||||||||||||||||
yield page_results | self.last_processed_serial = upstream_last_serial | ||||||||||||||||||||||||||||||||||
def get_origins_from_page( | def get_origins_from_page( | ||||||||||||||||||||||||||||||||||
Done Inline Actions
olasd: | |||||||||||||||||||||||||||||||||||
self, packages_name: PackageListPage | self, packages: PackageListPage | ||||||||||||||||||||||||||||||||||
) -> Iterator[ListedOrigin]: | ) -> Iterator[ListedOrigin]: | ||||||||||||||||||||||||||||||||||
"""Convert a page of PyPI repositories into a list of ListedOrigins.""" | """Convert a page of PyPI repositories into a list of ListedOrigins.""" | ||||||||||||||||||||||||||||||||||
assert self.lister_obj.id is not None | assert self.lister_obj.id is not None | ||||||||||||||||||||||||||||||||||
for package_name in packages_name: | for origin, last_update in packages: | ||||||||||||||||||||||||||||||||||
package_url = self.PACKAGE_URL.format(package_name=package_name) | |||||||||||||||||||||||||||||||||||
yield ListedOrigin( | yield ListedOrigin( | ||||||||||||||||||||||||||||||||||
lister_id=self.lister_obj.id, | lister_id=self.lister_obj.id, | ||||||||||||||||||||||||||||||||||
url=package_url, | url=origin, | ||||||||||||||||||||||||||||||||||
visit_type="pypi", | visit_type="pypi", | ||||||||||||||||||||||||||||||||||
last_update=None, # available on PyPI JSON API | last_update=last_update, | ||||||||||||||||||||||||||||||||||
) | ) | ||||||||||||||||||||||||||||||||||
def finalize(self): | |||||||||||||||||||||||||||||||||||
"""Finalize the visit state by updating with the new last_serial if updates | |||||||||||||||||||||||||||||||||||
actually happened. | |||||||||||||||||||||||||||||||||||
""" | |||||||||||||||||||||||||||||||||||
self.updated = ( | |||||||||||||||||||||||||||||||||||
self.state | |||||||||||||||||||||||||||||||||||
and self.state.last_serial | |||||||||||||||||||||||||||||||||||
and self.last_processed_serial | |||||||||||||||||||||||||||||||||||
and self.state.last_serial < self.last_processed_serial | |||||||||||||||||||||||||||||||||||
) or (not self.state.last_serial and self.last_processed_serial) | |||||||||||||||||||||||||||||||||||
Done Inline Actions
olasd: | |||||||||||||||||||||||||||||||||||
if self.updated: | |||||||||||||||||||||||||||||||||||
self.state.last_serial = self.last_processed_serial |
since the pypi instance was visited