diff --git a/swh/lister/pypi/lister.py b/swh/lister/pypi/lister.py --- a/swh/lister/pypi/lister.py +++ b/swh/lister/pypi/lister.py @@ -3,32 +3,70 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict +from dataclasses import asdict, dataclass +from datetime import datetime, timezone import logging -from typing import Iterator, List, Optional +from typing import Any, Dict, Iterator, List, Optional, Tuple +from xmlrpc.client import Fault, ServerProxy -from bs4 import BeautifulSoup -import requests +from tenacity.before_sleep import before_sleep_log +from swh.lister.utils import retry_attempt, throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -from .. import USER_AGENT -from ..pattern import CredentialsType, StatelessLister +from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) -PackageListPage = List[str] +# Type returned by the XML-RPC changelog call: +# package, version, release timestamp, description, serial +ChangelogEntry = Tuple[str, str, int, str, int] +# Manipulated package updated type which is a subset information +# of the ChangelogEntry type: package, max release date +PackageUpdate = Tuple[str, datetime] +# Type returned by listing a page of results +PackageListPage = List[PackageUpdate] -class PyPILister(StatelessLister[PackageListPage]): +@dataclass +class PyPIListerState: + """State of PyPI lister""" + + last_serial: Optional[int] = None + """Last seen serial when visiting the pypi instance""" + + +def _if_rate_limited(retry_state) -> bool: + """Custom tenacity retry predicate to handle xmlrpc client error: + + .. code:: + + xmlrpc.client.Fault: + + """ + attempt = retry_attempt(retry_state) + return attempt.failed and isinstance(attempt.exception(), Fault) + + +def pypi_url(package_name: str) -> str: + """Build pypi url out of a package name. + + """ + return PyPILister.PACKAGE_URL.format(package_name=package_name) + + +class PyPILister(Lister[PyPIListerState, PackageListPage]): """List origins from PyPI. """ LISTER_NAME = "pypi" INSTANCE = "pypi" # As of today only the main pypi.org is used - - PACKAGE_LIST_URL = "https://pypi.org/simple/" + PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url PACKAGE_URL = "https://pypi.org/project/{package_name}/" def __init__( @@ -43,35 +81,104 @@ credentials=credentials, ) - self.session = requests.Session() - self.session.headers.update( - {"Accept": "application/html", "User-Agent": USER_AGENT} - ) + # used as termination condition and if useful, becomes the new state when the + # visit is done + self.last_processed_serial: Optional[int] = None + + def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState: + return PyPIListerState(last_serial=d.get("last_serial")) + + def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]: + return asdict(state) + + @throttling_retry( + retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) + ) + def _changelog_last_serial(self, client: ServerProxy) -> int: + """Internal detail to allow throttling when calling the changelog last entry""" + serial = client.changelog_last_serial() + assert isinstance(serial, int) + return serial + + @throttling_retry( + retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) + ) + def _changelog_since_serial( + self, client: ServerProxy, serial: int + ) -> List[ChangelogEntry]: + """Internal detail to allow throttling when calling the changelog listing""" + return client.changelog_since_serial(serial) # type: ignore def get_pages(self) -> Iterator[PackageListPage]: - - response = self.session.get(self.PACKAGE_LIST_URL) - - response.raise_for_status() - - page = BeautifulSoup(response.content, features="html.parser") - - page_results = [p.text for p in page.find_all("a")] - - yield page_results + """Iterate other changelog events per package, determine the max release date for that + package and use that max release date as last_update. When the execution is + done, this will also set the self.last_processed_serial attribute so we can + finalize the state of the lister for the next visit. + + Note that the indirection method exists to hide the actual rpc calls details so + the testing is actually doable. Technically, the ServerProxy class used does not + expose exactly those methods due to internal implementation detail which makes + the testing hard for no good reason. + + Yields: + List of Tuple of (package-name, max release-date) + + """ + client = ServerProxy(self.url) + + last_processed_serial = -1 + if self.state.last_serial is not None: + last_processed_serial = self.state.last_serial + upstream_last_serial = self._changelog_last_serial(client) + + # Paginate through result of pypi, until we read everything + while last_processed_serial < upstream_last_serial: + updated_packages = defaultdict(list) + + for package, _, release_date, _, serial in self._changelog_since_serial( + client, last_processed_serial + ): + updated_packages[package].append(release_date) + # Compute the max serial so we can stop when done + last_processed_serial = max(last_processed_serial, serial) + + # Returns pages of result to flush regularly + yield [ + ( + pypi_url(package), + datetime.fromtimestamp(max(release_dates)).replace( + tzinfo=timezone.utc + ), + ) + for package, release_dates in updated_packages.items() + ] + + self.last_processed_serial = upstream_last_serial def get_origins_from_page( - self, packages_name: PackageListPage + self, packages: PackageListPage ) -> Iterator[ListedOrigin]: """Convert a page of PyPI repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None - for package_name in packages_name: - package_url = self.PACKAGE_URL.format(package_name=package_name) - + for origin, last_update in packages: yield ListedOrigin( lister_id=self.lister_obj.id, - url=package_url, + url=origin, visit_type="pypi", - last_update=None, # available on PyPI JSON API + last_update=last_update, ) + + def finalize(self): + """Finalize the visit state by updating with the new last_serial if updates + actually happened. + + """ + self.updated = ( + self.state + and self.state.last_serial + and self.last_processed_serial + and self.state.last_serial < self.last_processed_serial + ) or (not self.state.last_serial and self.last_processed_serial) + if self.updated: + self.state.last_serial = self.last_processed_serial diff --git a/swh/lister/pypi/tasks.py b/swh/lister/pypi/tasks.py --- a/swh/lister/pypi/tasks.py +++ b/swh/lister/pypi/tasks.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018 the Software Heritage developers +# Copyright (C) 2018-2021 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,7 +7,7 @@ from .lister import PyPILister -@shared_task(name=__name__ + ".PyPIListerTask") +@shared_task(name=f"{__name__}.PyPIListerTask") def list_pypi(): "Full listing of the PyPI registry" lister = PyPILister.from_configfile() diff --git a/swh/lister/pypi/tests/data/https_pypi.org/simple b/swh/lister/pypi/tests/data/https_pypi.org/simple deleted file mode 100644 --- a/swh/lister/pypi/tests/data/https_pypi.org/simple +++ /dev/null @@ -1,12 +0,0 @@ - - - - Simple index - - - 0lever-so - 0lever-utils - 0-orchestrator - 0wned - - \ No newline at end of file diff --git a/swh/lister/pypi/tests/test_lister.py b/swh/lister/pypi/tests/test_lister.py --- a/swh/lister/pypi/tests/test_lister.py +++ b/swh/lister/pypi/tests/test_lister.py @@ -1,26 +1,19 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from pathlib import Path +from collections import defaultdict +from datetime import datetime, timezone from typing import List import pytest -import requests -from swh.lister.pypi.lister import PyPILister +from swh.lister.pypi.lister import ChangelogEntry, PyPILister, pypi_url +from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin -@pytest.fixture -def pypi_packages_testdata(datadir): - content = Path(datadir, "https_pypi.org", "simple").read_bytes() - names = ["0lever-so", "0lever-utils", "0-orchestrator", "0wned"] - urls = [PyPILister.PACKAGE_URL.format(package_name=n) for n in names] - return content, names, urls - - def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedOrigin]): """Asserts that the two collections have the same origin URLs""" @@ -33,53 +26,6 @@ assert l_url == s_origin.url -def test_pypi_list(swh_scheduler, requests_mock, mocker, pypi_packages_testdata): - - t_content, t_names, t_urls = pypi_packages_testdata - - requests_mock.get(PyPILister.PACKAGE_LIST_URL, content=t_content) - - lister = PyPILister(scheduler=swh_scheduler) - - lister.get_origins_from_page = mocker.spy(lister, "get_origins_from_page") - lister.session.get = mocker.spy(lister.session, "get") - - stats = lister.run() - - scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results - - lister.session.get.assert_called_once_with(lister.PACKAGE_LIST_URL) - lister.get_origins_from_page.assert_called_once_with(t_names) - - assert stats.pages == 1 - assert stats.origins == 4 - assert len(scheduler_origins) == 4 - - check_listed_origins(t_urls, scheduler_origins) - - assert lister.get_state_from_scheduler() is None - - -@pytest.mark.parametrize("http_code", [400, 429, 500]) -def test_pypi_list_http_error(swh_scheduler, requests_mock, mocker, http_code): - - requests_mock.get( - PyPILister.PACKAGE_LIST_URL, [{"content": None, "status_code": http_code},], - ) - - lister = PyPILister(scheduler=swh_scheduler) - - lister.session.get = mocker.spy(lister.session, "get") - - with pytest.raises(requests.HTTPError): - lister.run() - - lister.session.get.assert_called_once_with(lister.PACKAGE_LIST_URL) - - scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results - assert len(scheduler_origins) == 0 - - @pytest.mark.parametrize( "credentials, expected_credentials", [ @@ -109,3 +55,203 @@ lister = PyPILister.from_configfile() assert lister.scheduler is not None assert lister.credentials is not None + + +def to_serial(changelog_entry: ChangelogEntry) -> int: + """Helper utility to read the serial entry in the tuple + + Args: + changelog_entry: Changelog entry to read data from + + Returns: + The serial from the entry + + """ + return changelog_entry[4] + + +def configure_scheduler_state( + scheduler: SchedulerInterface, data: List[ChangelogEntry] +): + """Allows to pre configure a last serial state for the lister consistent with the test + data set (the last_serial will be something inferior than the most minimal serial + in the data set). + + Args: + scheduler: The actual scheduler instance used during test + data: The actual dataset used during test + + """ + # Compute the lowest serial to make it a minimum state to store in the scheduler + lowest_serial = min(map(to_serial, data)) + + # We'll need to configure the scheduler's state + lister_obj = scheduler.get_or_create_lister( + name=PyPILister.LISTER_NAME, instance_name=PyPILister.INSTANCE + ) + lister_obj.current_state = {"last_serial": lowest_serial - 10} + scheduler.update_lister(lister_obj) + + +@pytest.fixture +def mock_pypi_xmlrpc(mocker, swh_scheduler): + """This setups a lister so it can actually fake the call to the rpc service executed + during an incremental listing. + + To retrieve or update the faked data, open a python3 toplevel and execute the + following: + + .. code:: python + + from datetime import timezone, datetime, timedelta + from xmlrpc.client import ServerProxy + from swh.scheduler.utils import utcnow + RPC_URL = "https://pypi.org/pypi" + cli = ServerProxy(RPC_URL) + last_serial = cli.changelog_last_serial() + # 10854808 + last_state_serial = 2168587 + results = cli.changelog_since_serial(last_state_serial) + + Returns: + the following Tuple[serial, List[PackageUpdate], MagicMock, MagicMock] type. + + """ + + data = [ + ["wordsmith", None, 1465998124, "add Owner DoublePlusAwks", 2168628], + ["wordsmith", "0.1", 1465998123, "new release", 2168629], + ["wordsmith", "0.1", 1465998131, "update classifiers", 2168630], + [ + "UFx", + "1.0", + 1465998207, + "update author_email, home_page, summary, description", + 2168631, + ], + ["UFx", "1.0", 1465998236, "remove file UFx-1.0.tar.gz", 2168632], + ["wordsmith", "0.1", 1465998309, "update classifiers", 2168633], + [ + "wordsmith", + "0.1", + 1465998406, + "update summary, description, classifiers", + 2168634, + ], + ["property-manager", "2.0", 1465998436, "new release", 2168635], + [ + "property-manager", + "2.0", + 1465998439, + "add source file property-manager-2.0.tar.gz", + 2168636, + ], + ["numtest", "2.0.0", 1465998446, "new release", 2168637], + ["property-manager", "2.1", 1465998468, "new release", 2168638], + [ + "property-manager", + "2.1", + 1465998472, + "add source file property-manager-2.1.tar.gz", + 2168639, + ], + ["kafka-utils", "0.2.0", 1465998477, "new release", 2168640], + [ + "kafka-utils", + "0.2.0", + 1465998480, + "add source file kafka-utils-0.2.0.tar.gz", + 2168641, + ], + ["numtest", "2.0.1", 1465998520, "new release", 2168642], + ["coala-bears", "0.3.0.dev20160615134909", 1465998552, "new release", 2168643], + [ + "coala-bears", + "0.3.0.dev20160615134909", + 1465998556, + "add py3 file coala_bears-0.3.0.dev20160615134909-py3-none-any.whl", + 2168644, + ], + ["django_sphinxsearch", "0.4.0", 1465998571, "new release", 2168645], + [ + "django_sphinxsearch", + "0.4.0", + 1465998573, + "add source file django_sphinxsearch-0.4.0.tar.gz", + 2168646, + ], + [ + "coala-bears", + "0.3.0.dev20160615134909", + 1465998589, + "add source file coala-bears-0.3.0.dev20160615134909.tar.gz", + 2168647, + ], + ] + highest_serial = min(map(to_serial, data)) + + # mock_last_serial = mocker.patch("xmlrpc.client.ServerProxy.last_serial") + # not working ^: AttributeError: does not have + # the attribute 'last_serial' + mock_last_serial = mocker.patch( + "swh.lister.pypi.lister.PyPILister._changelog_last_serial" + ) + mock_last_serial.return_value = highest_serial + + # mock = mocker.patch("xmlrpc.client.ServerProxy.changelog_since_serial") + # not working ^: AttributeError ^ + mock_since_serial = mocker.patch( + "swh.lister.pypi.lister.PyPILister._changelog_since_serial" + ) + mock_since_serial.return_value = data + + return highest_serial, data, mock_last_serial, mock_since_serial + + +@pytest.mark.parametrize("configure_state", [True, False]) +def test_lister_pypi_run(mock_pypi_xmlrpc, swh_scheduler, configure_state): + highest_serial, data, mock_last_serial, mock_since_serial = mock_pypi_xmlrpc + + if configure_state: + configure_scheduler_state(swh_scheduler, data) + + updated_packages = defaultdict(list) + for [package, _, release_date, _, _] in data: + updated_packages[package].append(release_date) + + assert len(updated_packages) > 0 + + expected_last_updates = { + pypi_url(package): datetime.fromtimestamp(max(releases)).replace( + tzinfo=timezone.utc + ) + for package, releases in updated_packages.items() + } + + expected_pypi_urls = [pypi_url(package_name) for package_name in updated_packages] + + lister = PyPILister(scheduler=swh_scheduler) + + stats = lister.run() + + assert mock_last_serial.called and mock_since_serial.called + assert stats.pages == 1 + assert stats.origins == len(updated_packages) + + scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results + assert len(scheduler_origins) == stats.origins + + check_listed_origins(expected_pypi_urls, scheduler_origins) + + actual_scheduler_state = lister.get_state_from_scheduler() + # This new visit updated the state to the new one + assert actual_scheduler_state.last_serial == highest_serial + + for listed_origin in scheduler_origins: + assert listed_origin.last_update is not None + assert listed_origin.last_update == expected_last_updates[listed_origin.url] + + +def test__if_rate_limited(): + # TODO + pass