diff --git a/requirements.txt b/requirements.txt index 8d9bb82..c57eecc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ python_debian requests setuptools iso8601 beautifulsoup4 launchpadlib -tenacity +tenacity >= 6.2 xmltodict diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py index 6c3f7e6..61006b0 100644 --- a/swh/lister/gitlab/lister.py +++ b/swh/lister/gitlab/lister.py @@ -1,265 +1,265 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging import random from typing import Any, Dict, Iterator, Optional, Tuple from urllib.parse import parse_qs, urlencode, urlparse import iso8601 import requests from requests.exceptions import HTTPError from requests.status_codes import codes from tenacity.before_sleep import before_sleep_log from swh.lister import USER_AGENT from swh.lister.pattern import CredentialsType, Lister -from swh.lister.utils import is_retryable_exception, retry_attempt, throttling_retry +from swh.lister.utils import is_retryable_exception, throttling_retry from swh.scheduler.model import ListedOrigin logger = logging.getLogger(__name__) # Some instance provides hg_git type which can be ingested as hg origins VCS_MAPPING = {"hg_git": "hg"} @dataclass class GitLabListerState: """State of the GitLabLister""" last_seen_next_link: Optional[str] = None """Last link header (not visited yet) during an incremental pass """ Repository = Dict[str, Any] @dataclass class PageResult: """Result from a query to a gitlab project api page.""" repositories: Optional[Tuple[Repository, ...]] = None next_page: Optional[str] = None def _if_rate_limited(retry_state) -> bool: """Custom tenacity retry predicate for handling HTTP responses with status code 403 with specific ratelimit header. """ - attempt = retry_attempt(retry_state) + attempt = retry_state.outcome if attempt.failed: exc = attempt.exception() return ( isinstance(exc, HTTPError) and exc.response.status_code == codes.forbidden and int(exc.response.headers.get("RateLimit-Remaining", "0")) == 0 ) or is_retryable_exception(exc) return False def _parse_id_after(url: Optional[str]) -> Optional[int]: """Given an url, extract a return the 'id_after' query parameter associated value or None. This is the the repository id used for pagination purposes. """ if not url: return None # link: https://${project-api}/?...&id_after=2x... query_data = parse_qs(urlparse(url).query) page = query_data.get("id_after") if page and len(page) > 0: return int(page[0]) return None class GitLabLister(Lister[GitLabListerState, PageResult]): """List origins for a gitlab instance. By default, the lister runs in incremental mode: it lists all repositories, starting with the `last_seen_next_link` stored in the scheduler backend. Args: scheduler: a scheduler instance url: the api v4 url of the gitlab instance to visit (e.g. https://gitlab.com/api/v4/) instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...), url network location will be used if not provided incremental: defines if incremental listing is activated or not """ def __init__( self, scheduler, url: str, name: Optional[str] = "gitlab", instance: Optional[str] = None, credentials: Optional[CredentialsType] = None, incremental: bool = False, ): if name is not None: self.LISTER_NAME = name super().__init__( scheduler=scheduler, url=url.rstrip("/"), instance=instance, credentials=credentials, ) self.incremental = incremental self.last_page: Optional[str] = None self.per_page = 100 self.session = requests.Session() self.session.headers.update( {"Accept": "application/json", "User-Agent": USER_AGENT} ) if len(self.credentials) > 0: cred = random.choice(self.credentials) logger.info( "Using %s credentials from user %s", self.instance, cred["username"] ) api_token = cred["password"] if api_token: self.session.headers["Authorization"] = f"Bearer {api_token}" def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState: return GitLabListerState(**d) def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]: return asdict(state) @throttling_retry( retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) ) def get_page_result(self, url: str) -> PageResult: logger.debug("Fetching URL %s", url) response = self.session.get(url) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) # GitLab API can return errors 500 when listing projects. # https://gitlab.com/gitlab-org/gitlab/-/issues/262629 # To avoid ending the listing prematurely, skip buggy URLs and move # to next pages. if response.status_code == 500: id_after = _parse_id_after(url) assert id_after is not None while True: next_id_after = id_after + self.per_page url = url.replace(f"id_after={id_after}", f"id_after={next_id_after}") response = self.session.get(url) if response.status_code == 200: break else: id_after = next_id_after else: response.raise_for_status() repositories: Tuple[Repository, ...] = tuple(response.json()) if hasattr(response, "links") and response.links.get("next"): next_page = response.links["next"]["url"] else: next_page = None return PageResult(repositories, next_page) def page_url(self, id_after: Optional[int] = None) -> str: parameters = { "pagination": "keyset", "order_by": "id", "sort": "asc", "simple": "true", "per_page": f"{self.per_page}", } if id_after is not None: parameters["id_after"] = str(id_after) return f"{self.url}/projects?{urlencode(parameters)}" def get_pages(self) -> Iterator[PageResult]: next_page: Optional[str] if self.incremental and self.state and self.state.last_seen_next_link: next_page = self.state.last_seen_next_link else: next_page = self.page_url() while next_page: self.last_page = next_page page_result = self.get_page_result(next_page) yield page_result next_page = page_result.next_page def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]: assert self.lister_obj.id is not None repositories = page_result.repositories if page_result.repositories else [] for repo in repositories: visit_type = repo.get("vcs_type", "git") visit_type = VCS_MAPPING.get(visit_type, visit_type) yield ListedOrigin( lister_id=self.lister_obj.id, url=repo["http_url_to_repo"], visit_type=visit_type, last_update=iso8601.parse_date(repo["last_activity_at"]), ) def commit_page(self, page_result: PageResult) -> None: """Update currently stored state using the latest listed "next" page if relevant. Relevancy is determined by the next_page link whose 'page' id must be strictly superior to the currently stored one. Note: this is a noop for full listing mode """ if self.incremental: # link: https://${project-api}/?...&page=2x... next_page = page_result.next_page if not next_page and self.last_page: next_page = self.last_page if next_page: id_after = _parse_id_after(next_page) previous_next_page = self.state.last_seen_next_link previous_id_after = _parse_id_after(previous_next_page) if previous_next_page is None or ( previous_id_after and id_after and previous_id_after < id_after ): self.state.last_seen_next_link = next_page def finalize(self) -> None: """finalize the lister state when relevant (see `fn:commit_page` for details) Note: this is a noop for full listing mode """ next_page = self.state.last_seen_next_link if self.incremental and next_page: # link: https://${project-api}/?...&page=2x... next_id_after = _parse_id_after(next_page) scheduler_state = self.get_state_from_scheduler() previous_next_id_after = _parse_id_after( scheduler_state.last_seen_next_link ) if (not previous_next_id_after and next_id_after) or ( previous_next_id_after and next_id_after and previous_next_id_after < next_id_after ): self.updated = True diff --git a/swh/lister/pypi/lister.py b/swh/lister/pypi/lister.py index 3df721f..abc27ec 100644 --- a/swh/lister/pypi/lister.py +++ b/swh/lister/pypi/lister.py @@ -1,181 +1,181 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from dataclasses import asdict, dataclass from datetime import datetime, timezone import logging from time import sleep from typing import Any, Dict, Iterator, List, Optional, Tuple from xmlrpc.client import Fault, ServerProxy from tenacity.before_sleep import before_sleep_log -from swh.lister.utils import retry_attempt, throttling_retry +from swh.lister.utils import throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) # Type returned by the XML-RPC changelog call: # package, version, release timestamp, description, serial ChangelogEntry = Tuple[str, str, int, str, int] # Manipulated package updated type which is a subset information # of the ChangelogEntry type: package, max release date PackageUpdate = Tuple[str, datetime] # Type returned by listing a page of results PackageListPage = List[PackageUpdate] @dataclass class PyPIListerState: """State of PyPI lister""" last_serial: Optional[int] = None """Last seen serial when visiting the pypi instance""" def _if_rate_limited(retry_state) -> bool: """Custom tenacity retry predicate to handle xmlrpc client error: .. code:: xmlrpc.client.Fault: """ - attempt = retry_attempt(retry_state) + attempt = retry_state.outcome return attempt.failed and isinstance(attempt.exception(), Fault) def pypi_url(package_name: str) -> str: """Build pypi url out of a package name. """ return PyPILister.PACKAGE_URL.format(package_name=package_name) class PyPILister(Lister[PyPIListerState, PackageListPage]): """List origins from PyPI. """ LISTER_NAME = "pypi" INSTANCE = "pypi" # As of today only the main pypi.org is used PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url PACKAGE_URL = "https://pypi.org/project/{package_name}/" def __init__( self, scheduler: SchedulerInterface, credentials: Optional[CredentialsType] = None, ): super().__init__( scheduler=scheduler, url=self.PACKAGE_LIST_URL, instance=self.INSTANCE, credentials=credentials, ) # used as termination condition and if useful, becomes the new state when the # visit is done self.last_processed_serial: Optional[int] = None def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState: return PyPIListerState(last_serial=d.get("last_serial")) def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]: return asdict(state) @throttling_retry( retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) ) def _changelog_last_serial(self, client: ServerProxy) -> int: """Internal detail to allow throttling when calling the changelog last entry""" serial = client.changelog_last_serial() assert isinstance(serial, int) return serial @throttling_retry( retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING) ) def _changelog_since_serial( self, client: ServerProxy, serial: int ) -> List[ChangelogEntry]: """Internal detail to allow throttling when calling the changelog listing""" sleep(1) # to avoid the initial warning about throttling return client.changelog_since_serial(serial) # type: ignore def get_pages(self) -> Iterator[PackageListPage]: """Iterate other changelog events per package, determine the max release date for that package and use that max release date as last_update. When the execution is done, this will also set the self.last_processed_serial attribute so we can finalize the state of the lister for the next visit. Yields: List of Tuple of (package-name, max release-date) """ client = ServerProxy(self.url) last_processed_serial = -1 if self.state.last_serial is not None: last_processed_serial = self.state.last_serial upstream_last_serial = self._changelog_last_serial(client) # Paginate through result of pypi, until we read everything while last_processed_serial < upstream_last_serial: updated_packages = defaultdict(list) for package, _, release_date, _, serial in self._changelog_since_serial( client, last_processed_serial ): updated_packages[package].append(release_date) # Compute the max serial so we can stop when done last_processed_serial = max(last_processed_serial, serial) # Returns pages of result to flush regularly yield [ ( pypi_url(package), datetime.fromtimestamp(max(release_dates)).replace( tzinfo=timezone.utc ), ) for package, release_dates in updated_packages.items() ] self.last_processed_serial = upstream_last_serial def get_origins_from_page( self, packages: PackageListPage ) -> Iterator[ListedOrigin]: """Convert a page of PyPI repositories into a list of ListedOrigins.""" assert self.lister_obj.id is not None for origin, last_update in packages: yield ListedOrigin( lister_id=self.lister_obj.id, url=origin, visit_type="pypi", last_update=last_update, ) def finalize(self): """Finalize the visit state by updating with the new last_serial if updates actually happened. """ self.updated = ( self.state and self.state.last_serial and self.last_processed_serial and self.state.last_serial < self.last_processed_serial ) or (not self.state.last_serial and self.last_processed_serial) if self.updated: self.state.last_serial = self.last_processed_serial diff --git a/swh/lister/tests/test_utils.py b/swh/lister/tests/test_utils.py index 763f743..acb73fe 100644 --- a/swh/lister/tests/test_utils.py +++ b/swh/lister/tests/test_utils.py @@ -1,120 +1,113 @@ -# Copyright (C) 2018-2020 the Software Heritage developers +# Copyright (C) 2018-2021 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest import requests from requests.status_codes import codes from tenacity.wait import wait_fixed from swh.lister.utils import ( MAX_NUMBER_ATTEMPTS, WAIT_EXP_BASE, split_range, throttling_retry, ) @pytest.mark.parametrize( "total_pages,nb_pages,expected_ranges", [ (14, 5, [(0, 4), (5, 9), (10, 14)]), (19, 10, [(0, 9), (10, 19)]), (20, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]), (21, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21),],), ], ) def test_split_range(total_pages, nb_pages, expected_ranges): actual_ranges = list(split_range(total_pages, nb_pages)) assert actual_ranges == expected_ranges @pytest.mark.parametrize("total_pages,nb_pages", [(None, 1), (100, None)]) def test_split_range_errors(total_pages, nb_pages): for total_pages, nb_pages in [(None, 1), (100, None)]: with pytest.raises(TypeError): next(split_range(total_pages, nb_pages)) TEST_URL = "https://example.og/api/repositories" @throttling_retry() def make_request(): response = requests.get(TEST_URL) response.raise_for_status() return response def assert_sleep_calls(mocker, mock_sleep, sleep_params): - try: - mock_sleep.assert_has_calls([mocker.call(param) for param in sleep_params]) - except AssertionError: - # tenacity < 5.1 has a different behavior for wait_exponential - # https://github.com/jd/tenacity/commit/aac4307a0aa30d7befd0ebe4212ee4fc69083a95 - mock_sleep.assert_has_calls( - [mocker.call(param * WAIT_EXP_BASE) for param in sleep_params] - ) + mock_sleep.assert_has_calls([mocker.call(param) for param in sleep_params]) def test_throttling_retry(requests_mock, mocker): data = {"result": {}} requests_mock.get( TEST_URL, [ {"status_code": codes.too_many_requests}, {"status_code": codes.too_many_requests}, {"status_code": codes.ok, "json": data}, ], ) mock_sleep = mocker.patch.object(make_request.retry, "sleep") response = make_request() assert_sleep_calls(mocker, mock_sleep, [1, WAIT_EXP_BASE]) assert response.json() == data def test_throttling_retry_max_attemps(requests_mock, mocker): requests_mock.get( TEST_URL, [{"status_code": codes.too_many_requests}] * (MAX_NUMBER_ATTEMPTS), ) mock_sleep = mocker.patch.object(make_request.retry, "sleep") with pytest.raises(requests.exceptions.HTTPError) as e: make_request() assert e.value.response.status_code == codes.too_many_requests assert_sleep_calls( mocker, mock_sleep, [float(WAIT_EXP_BASE ** i) for i in range(MAX_NUMBER_ATTEMPTS - 1)], ) @throttling_retry(wait=wait_fixed(WAIT_EXP_BASE)) def make_request_wait_fixed(): response = requests.get(TEST_URL) response.raise_for_status() return response def test_throttling_retry_wait_fixed(requests_mock, mocker): requests_mock.get( TEST_URL, [ {"status_code": codes.too_many_requests}, {"status_code": codes.too_many_requests}, {"status_code": codes.ok}, ], ) mock_sleep = mocker.patch.object(make_request_wait_fixed.retry, "sleep") make_request_wait_fixed() assert_sleep_calls(mocker, mock_sleep, [WAIT_EXP_BASE] * 2) diff --git a/swh/lister/utils.py b/swh/lister/utils.py index 9df6907..ea4a989 100644 --- a/swh/lister/utils.py +++ b/swh/lister/utils.py @@ -1,134 +1,121 @@ # Copyright (C) 2018-2021 the Software Heritage developers # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Callable, Iterator, Tuple from requests.exceptions import ConnectionError, HTTPError from requests.status_codes import codes from tenacity import retry as tenacity_retry from tenacity.stop import stop_after_attempt from tenacity.wait import wait_exponential def split_range(total_pages: int, nb_pages: int) -> Iterator[Tuple[int, int]]: """Split `total_pages` into mostly `nb_pages` ranges. In some cases, the last range can have one more element. >>> list(split_range(19, 10)) [(0, 9), (10, 19)] >>> list(split_range(20, 3)) [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)] >>> list(split_range(21, 3)) [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21)] """ prev_index = None for index in range(0, total_pages, nb_pages): if index is not None and prev_index is not None: yield prev_index, index - 1 prev_index = index if index != total_pages: yield index, total_pages def is_throttling_exception(e: Exception) -> bool: """ Checks if an exception is a requests.exception.HTTPError for a response with status code 429 (too many requests). """ return ( isinstance(e, HTTPError) and e.response.status_code == codes.too_many_requests ) def is_retryable_exception(e: Exception) -> bool: """ Checks if an exception is worth retrying (connection, throttling or a server error). """ is_connection_error = isinstance(e, ConnectionError) is_500_error = isinstance(e, HTTPError) and e.response.status_code >= 500 return is_connection_error or is_throttling_exception(e) or is_500_error -def retry_attempt(retry_state): - """ - Utility function to get last retry attempt info based on the - tenacity version (as debian buster packages version 4.12). - """ - try: - attempt = retry_state.outcome - except AttributeError: - # tenacity < 5.0 - attempt = retry_state - return attempt - - def retry_if_exception(retry_state, predicate: Callable[[Exception], bool]) -> bool: """ Custom tenacity retry predicate for handling exceptions with the given predicate. """ - attempt = retry_attempt(retry_state) + attempt = retry_state.outcome if attempt.failed: exception = attempt.exception() return predicate(exception) return False def retry_if_throttling(retry_state) -> bool: """ Custom tenacity retry predicate for handling HTTP responses with status code 429 (too many requests). """ return retry_if_exception(retry_state, is_throttling_exception) def retry_policy_generic(retry_state) -> bool: """ Custom tenacity retry predicate for handling failed requests: - ConnectionError - Server errors (status >= 500) - Throttling errors (status == 429) This does not handle 404, 403 or other status codes. """ return retry_if_exception(retry_state, is_retryable_exception) WAIT_EXP_BASE = 10 MAX_NUMBER_ATTEMPTS = 5 def throttling_retry( retry=retry_if_throttling, wait=wait_exponential(exp_base=WAIT_EXP_BASE), stop=stop_after_attempt(max_attempt_number=MAX_NUMBER_ATTEMPTS), **retry_args, ): """ Decorator based on `tenacity` for retrying a function possibly raising requests.exception.HTTPError for status code 429 (too many requests). It provides a default configuration that should work properly in most cases but all `tenacity.retry` parameters can also be overridden in client code. When the mmaximum of attempts is reached, the HTTPError exception will then be reraised. Args: retry: function defining request retry condition (default to 429 status code) https://tenacity.readthedocs.io/en/latest/#whether-to-retry wait: function defining wait strategy before retrying (default to exponential backoff) https://tenacity.readthedocs.io/en/latest/#waiting-before-retrying stop: function defining when to stop retrying (default after 5 attempts) https://tenacity.readthedocs.io/en/latest/#stopping """ return tenacity_retry(retry=retry, wait=wait, stop=stop, reraise=True, **retry_args)