Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F11023630
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
25 KB
Subscribers
None
View Options
diff --git a/requirements.txt b/requirements.txt
index 8d9bb82..c57eecc 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,8 +1,8 @@
python_debian
requests
setuptools
iso8601
beautifulsoup4
launchpadlib
-tenacity
+tenacity >= 6.2
xmltodict
diff --git a/swh/lister/gitlab/lister.py b/swh/lister/gitlab/lister.py
index 6c3f7e6..61006b0 100644
--- a/swh/lister/gitlab/lister.py
+++ b/swh/lister/gitlab/lister.py
@@ -1,265 +1,265 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from dataclasses import asdict, dataclass
import logging
import random
from typing import Any, Dict, Iterator, Optional, Tuple
from urllib.parse import parse_qs, urlencode, urlparse
import iso8601
import requests
from requests.exceptions import HTTPError
from requests.status_codes import codes
from tenacity.before_sleep import before_sleep_log
from swh.lister import USER_AGENT
from swh.lister.pattern import CredentialsType, Lister
-from swh.lister.utils import is_retryable_exception, retry_attempt, throttling_retry
+from swh.lister.utils import is_retryable_exception, throttling_retry
from swh.scheduler.model import ListedOrigin
logger = logging.getLogger(__name__)
# Some instance provides hg_git type which can be ingested as hg origins
VCS_MAPPING = {"hg_git": "hg"}
@dataclass
class GitLabListerState:
"""State of the GitLabLister"""
last_seen_next_link: Optional[str] = None
"""Last link header (not visited yet) during an incremental pass
"""
Repository = Dict[str, Any]
@dataclass
class PageResult:
"""Result from a query to a gitlab project api page."""
repositories: Optional[Tuple[Repository, ...]] = None
next_page: Optional[str] = None
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate for handling HTTP responses with status code 403
with specific ratelimit header.
"""
- attempt = retry_attempt(retry_state)
+ attempt = retry_state.outcome
if attempt.failed:
exc = attempt.exception()
return (
isinstance(exc, HTTPError)
and exc.response.status_code == codes.forbidden
and int(exc.response.headers.get("RateLimit-Remaining", "0")) == 0
) or is_retryable_exception(exc)
return False
def _parse_id_after(url: Optional[str]) -> Optional[int]:
"""Given an url, extract a return the 'id_after' query parameter associated value
or None.
This is the the repository id used for pagination purposes.
"""
if not url:
return None
# link: https://${project-api}/?...&id_after=2x...
query_data = parse_qs(urlparse(url).query)
page = query_data.get("id_after")
if page and len(page) > 0:
return int(page[0])
return None
class GitLabLister(Lister[GitLabListerState, PageResult]):
"""List origins for a gitlab instance.
By default, the lister runs in incremental mode: it lists all repositories,
starting with the `last_seen_next_link` stored in the scheduler backend.
Args:
scheduler: a scheduler instance
url: the api v4 url of the gitlab instance to visit (e.g.
https://gitlab.com/api/v4/)
instance: a specific instance name (e.g. gitlab, tor, git-kernel, ...),
url network location will be used if not provided
incremental: defines if incremental listing is activated or not
"""
def __init__(
self,
scheduler,
url: str,
name: Optional[str] = "gitlab",
instance: Optional[str] = None,
credentials: Optional[CredentialsType] = None,
incremental: bool = False,
):
if name is not None:
self.LISTER_NAME = name
super().__init__(
scheduler=scheduler,
url=url.rstrip("/"),
instance=instance,
credentials=credentials,
)
self.incremental = incremental
self.last_page: Optional[str] = None
self.per_page = 100
self.session = requests.Session()
self.session.headers.update(
{"Accept": "application/json", "User-Agent": USER_AGENT}
)
if len(self.credentials) > 0:
cred = random.choice(self.credentials)
logger.info(
"Using %s credentials from user %s", self.instance, cred["username"]
)
api_token = cred["password"]
if api_token:
self.session.headers["Authorization"] = f"Bearer {api_token}"
def state_from_dict(self, d: Dict[str, Any]) -> GitLabListerState:
return GitLabListerState(**d)
def state_to_dict(self, state: GitLabListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def get_page_result(self, url: str) -> PageResult:
logger.debug("Fetching URL %s", url)
response = self.session.get(url)
if response.status_code != 200:
logger.warning(
"Unexpected HTTP status code %s on %s: %s",
response.status_code,
response.url,
response.content,
)
# GitLab API can return errors 500 when listing projects.
# https://gitlab.com/gitlab-org/gitlab/-/issues/262629
# To avoid ending the listing prematurely, skip buggy URLs and move
# to next pages.
if response.status_code == 500:
id_after = _parse_id_after(url)
assert id_after is not None
while True:
next_id_after = id_after + self.per_page
url = url.replace(f"id_after={id_after}", f"id_after={next_id_after}")
response = self.session.get(url)
if response.status_code == 200:
break
else:
id_after = next_id_after
else:
response.raise_for_status()
repositories: Tuple[Repository, ...] = tuple(response.json())
if hasattr(response, "links") and response.links.get("next"):
next_page = response.links["next"]["url"]
else:
next_page = None
return PageResult(repositories, next_page)
def page_url(self, id_after: Optional[int] = None) -> str:
parameters = {
"pagination": "keyset",
"order_by": "id",
"sort": "asc",
"simple": "true",
"per_page": f"{self.per_page}",
}
if id_after is not None:
parameters["id_after"] = str(id_after)
return f"{self.url}/projects?{urlencode(parameters)}"
def get_pages(self) -> Iterator[PageResult]:
next_page: Optional[str]
if self.incremental and self.state and self.state.last_seen_next_link:
next_page = self.state.last_seen_next_link
else:
next_page = self.page_url()
while next_page:
self.last_page = next_page
page_result = self.get_page_result(next_page)
yield page_result
next_page = page_result.next_page
def get_origins_from_page(self, page_result: PageResult) -> Iterator[ListedOrigin]:
assert self.lister_obj.id is not None
repositories = page_result.repositories if page_result.repositories else []
for repo in repositories:
visit_type = repo.get("vcs_type", "git")
visit_type = VCS_MAPPING.get(visit_type, visit_type)
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=repo["http_url_to_repo"],
visit_type=visit_type,
last_update=iso8601.parse_date(repo["last_activity_at"]),
)
def commit_page(self, page_result: PageResult) -> None:
"""Update currently stored state using the latest listed "next" page if relevant.
Relevancy is determined by the next_page link whose 'page' id must be strictly
superior to the currently stored one.
Note: this is a noop for full listing mode
"""
if self.incremental:
# link: https://${project-api}/?...&page=2x...
next_page = page_result.next_page
if not next_page and self.last_page:
next_page = self.last_page
if next_page:
id_after = _parse_id_after(next_page)
previous_next_page = self.state.last_seen_next_link
previous_id_after = _parse_id_after(previous_next_page)
if previous_next_page is None or (
previous_id_after and id_after and previous_id_after < id_after
):
self.state.last_seen_next_link = next_page
def finalize(self) -> None:
"""finalize the lister state when relevant (see `fn:commit_page` for details)
Note: this is a noop for full listing mode
"""
next_page = self.state.last_seen_next_link
if self.incremental and next_page:
# link: https://${project-api}/?...&page=2x...
next_id_after = _parse_id_after(next_page)
scheduler_state = self.get_state_from_scheduler()
previous_next_id_after = _parse_id_after(
scheduler_state.last_seen_next_link
)
if (not previous_next_id_after and next_id_after) or (
previous_next_id_after
and next_id_after
and previous_next_id_after < next_id_after
):
self.updated = True
diff --git a/swh/lister/pypi/lister.py b/swh/lister/pypi/lister.py
index 3df721f..abc27ec 100644
--- a/swh/lister/pypi/lister.py
+++ b/swh/lister/pypi/lister.py
@@ -1,181 +1,181 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
import logging
from time import sleep
from typing import Any, Dict, Iterator, List, Optional, Tuple
from xmlrpc.client import Fault, ServerProxy
from tenacity.before_sleep import before_sleep_log
-from swh.lister.utils import retry_attempt, throttling_retry
+from swh.lister.utils import throttling_retry
from swh.scheduler.interface import SchedulerInterface
from swh.scheduler.model import ListedOrigin
from ..pattern import CredentialsType, Lister
logger = logging.getLogger(__name__)
# Type returned by the XML-RPC changelog call:
# package, version, release timestamp, description, serial
ChangelogEntry = Tuple[str, str, int, str, int]
# Manipulated package updated type which is a subset information
# of the ChangelogEntry type: package, max release date
PackageUpdate = Tuple[str, datetime]
# Type returned by listing a page of results
PackageListPage = List[PackageUpdate]
@dataclass
class PyPIListerState:
"""State of PyPI lister"""
last_serial: Optional[int] = None
"""Last seen serial when visiting the pypi instance"""
def _if_rate_limited(retry_state) -> bool:
"""Custom tenacity retry predicate to handle xmlrpc client error:
.. code::
xmlrpc.client.Fault: <Fault -32500: 'HTTPTooManyRequests: The action could not
be performed because there were too many requests by the client. Limit may reset
in 1 seconds.'>
"""
- attempt = retry_attempt(retry_state)
+ attempt = retry_state.outcome
return attempt.failed and isinstance(attempt.exception(), Fault)
def pypi_url(package_name: str) -> str:
"""Build pypi url out of a package name.
"""
return PyPILister.PACKAGE_URL.format(package_name=package_name)
class PyPILister(Lister[PyPIListerState, PackageListPage]):
"""List origins from PyPI.
"""
LISTER_NAME = "pypi"
INSTANCE = "pypi" # As of today only the main pypi.org is used
PACKAGE_LIST_URL = "https://pypi.org/pypi" # XML-RPC url
PACKAGE_URL = "https://pypi.org/project/{package_name}/"
def __init__(
self,
scheduler: SchedulerInterface,
credentials: Optional[CredentialsType] = None,
):
super().__init__(
scheduler=scheduler,
url=self.PACKAGE_LIST_URL,
instance=self.INSTANCE,
credentials=credentials,
)
# used as termination condition and if useful, becomes the new state when the
# visit is done
self.last_processed_serial: Optional[int] = None
def state_from_dict(self, d: Dict[str, Any]) -> PyPIListerState:
return PyPIListerState(last_serial=d.get("last_serial"))
def state_to_dict(self, state: PyPIListerState) -> Dict[str, Any]:
return asdict(state)
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_last_serial(self, client: ServerProxy) -> int:
"""Internal detail to allow throttling when calling the changelog last entry"""
serial = client.changelog_last_serial()
assert isinstance(serial, int)
return serial
@throttling_retry(
retry=_if_rate_limited, before_sleep=before_sleep_log(logger, logging.WARNING)
)
def _changelog_since_serial(
self, client: ServerProxy, serial: int
) -> List[ChangelogEntry]:
"""Internal detail to allow throttling when calling the changelog listing"""
sleep(1) # to avoid the initial warning about throttling
return client.changelog_since_serial(serial) # type: ignore
def get_pages(self) -> Iterator[PackageListPage]:
"""Iterate other changelog events per package, determine the max release date for that
package and use that max release date as last_update. When the execution is
done, this will also set the self.last_processed_serial attribute so we can
finalize the state of the lister for the next visit.
Yields:
List of Tuple of (package-name, max release-date)
"""
client = ServerProxy(self.url)
last_processed_serial = -1
if self.state.last_serial is not None:
last_processed_serial = self.state.last_serial
upstream_last_serial = self._changelog_last_serial(client)
# Paginate through result of pypi, until we read everything
while last_processed_serial < upstream_last_serial:
updated_packages = defaultdict(list)
for package, _, release_date, _, serial in self._changelog_since_serial(
client, last_processed_serial
):
updated_packages[package].append(release_date)
# Compute the max serial so we can stop when done
last_processed_serial = max(last_processed_serial, serial)
# Returns pages of result to flush regularly
yield [
(
pypi_url(package),
datetime.fromtimestamp(max(release_dates)).replace(
tzinfo=timezone.utc
),
)
for package, release_dates in updated_packages.items()
]
self.last_processed_serial = upstream_last_serial
def get_origins_from_page(
self, packages: PackageListPage
) -> Iterator[ListedOrigin]:
"""Convert a page of PyPI repositories into a list of ListedOrigins."""
assert self.lister_obj.id is not None
for origin, last_update in packages:
yield ListedOrigin(
lister_id=self.lister_obj.id,
url=origin,
visit_type="pypi",
last_update=last_update,
)
def finalize(self):
"""Finalize the visit state by updating with the new last_serial if updates
actually happened.
"""
self.updated = (
self.state
and self.state.last_serial
and self.last_processed_serial
and self.state.last_serial < self.last_processed_serial
) or (not self.state.last_serial and self.last_processed_serial)
if self.updated:
self.state.last_serial = self.last_processed_serial
diff --git a/swh/lister/tests/test_utils.py b/swh/lister/tests/test_utils.py
index 763f743..acb73fe 100644
--- a/swh/lister/tests/test_utils.py
+++ b/swh/lister/tests/test_utils.py
@@ -1,120 +1,113 @@
-# Copyright (C) 2018-2020 the Software Heritage developers
+# Copyright (C) 2018-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pytest
import requests
from requests.status_codes import codes
from tenacity.wait import wait_fixed
from swh.lister.utils import (
MAX_NUMBER_ATTEMPTS,
WAIT_EXP_BASE,
split_range,
throttling_retry,
)
@pytest.mark.parametrize(
"total_pages,nb_pages,expected_ranges",
[
(14, 5, [(0, 4), (5, 9), (10, 14)]),
(19, 10, [(0, 9), (10, 19)]),
(20, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]),
(21, 3, [(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21),],),
],
)
def test_split_range(total_pages, nb_pages, expected_ranges):
actual_ranges = list(split_range(total_pages, nb_pages))
assert actual_ranges == expected_ranges
@pytest.mark.parametrize("total_pages,nb_pages", [(None, 1), (100, None)])
def test_split_range_errors(total_pages, nb_pages):
for total_pages, nb_pages in [(None, 1), (100, None)]:
with pytest.raises(TypeError):
next(split_range(total_pages, nb_pages))
TEST_URL = "https://example.og/api/repositories"
@throttling_retry()
def make_request():
response = requests.get(TEST_URL)
response.raise_for_status()
return response
def assert_sleep_calls(mocker, mock_sleep, sleep_params):
- try:
- mock_sleep.assert_has_calls([mocker.call(param) for param in sleep_params])
- except AssertionError:
- # tenacity < 5.1 has a different behavior for wait_exponential
- # https://github.com/jd/tenacity/commit/aac4307a0aa30d7befd0ebe4212ee4fc69083a95
- mock_sleep.assert_has_calls(
- [mocker.call(param * WAIT_EXP_BASE) for param in sleep_params]
- )
+ mock_sleep.assert_has_calls([mocker.call(param) for param in sleep_params])
def test_throttling_retry(requests_mock, mocker):
data = {"result": {}}
requests_mock.get(
TEST_URL,
[
{"status_code": codes.too_many_requests},
{"status_code": codes.too_many_requests},
{"status_code": codes.ok, "json": data},
],
)
mock_sleep = mocker.patch.object(make_request.retry, "sleep")
response = make_request()
assert_sleep_calls(mocker, mock_sleep, [1, WAIT_EXP_BASE])
assert response.json() == data
def test_throttling_retry_max_attemps(requests_mock, mocker):
requests_mock.get(
TEST_URL, [{"status_code": codes.too_many_requests}] * (MAX_NUMBER_ATTEMPTS),
)
mock_sleep = mocker.patch.object(make_request.retry, "sleep")
with pytest.raises(requests.exceptions.HTTPError) as e:
make_request()
assert e.value.response.status_code == codes.too_many_requests
assert_sleep_calls(
mocker,
mock_sleep,
[float(WAIT_EXP_BASE ** i) for i in range(MAX_NUMBER_ATTEMPTS - 1)],
)
@throttling_retry(wait=wait_fixed(WAIT_EXP_BASE))
def make_request_wait_fixed():
response = requests.get(TEST_URL)
response.raise_for_status()
return response
def test_throttling_retry_wait_fixed(requests_mock, mocker):
requests_mock.get(
TEST_URL,
[
{"status_code": codes.too_many_requests},
{"status_code": codes.too_many_requests},
{"status_code": codes.ok},
],
)
mock_sleep = mocker.patch.object(make_request_wait_fixed.retry, "sleep")
make_request_wait_fixed()
assert_sleep_calls(mocker, mock_sleep, [WAIT_EXP_BASE] * 2)
diff --git a/swh/lister/utils.py b/swh/lister/utils.py
index 9df6907..ea4a989 100644
--- a/swh/lister/utils.py
+++ b/swh/lister/utils.py
@@ -1,134 +1,121 @@
# Copyright (C) 2018-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Callable, Iterator, Tuple
from requests.exceptions import ConnectionError, HTTPError
from requests.status_codes import codes
from tenacity import retry as tenacity_retry
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential
def split_range(total_pages: int, nb_pages: int) -> Iterator[Tuple[int, int]]:
"""Split `total_pages` into mostly `nb_pages` ranges. In some cases, the last range can
have one more element.
>>> list(split_range(19, 10))
[(0, 9), (10, 19)]
>>> list(split_range(20, 3))
[(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 20)]
>>> list(split_range(21, 3))
[(0, 2), (3, 5), (6, 8), (9, 11), (12, 14), (15, 17), (18, 21)]
"""
prev_index = None
for index in range(0, total_pages, nb_pages):
if index is not None and prev_index is not None:
yield prev_index, index - 1
prev_index = index
if index != total_pages:
yield index, total_pages
def is_throttling_exception(e: Exception) -> bool:
"""
Checks if an exception is a requests.exception.HTTPError for
a response with status code 429 (too many requests).
"""
return (
isinstance(e, HTTPError) and e.response.status_code == codes.too_many_requests
)
def is_retryable_exception(e: Exception) -> bool:
"""
Checks if an exception is worth retrying (connection, throttling or a server error).
"""
is_connection_error = isinstance(e, ConnectionError)
is_500_error = isinstance(e, HTTPError) and e.response.status_code >= 500
return is_connection_error or is_throttling_exception(e) or is_500_error
-def retry_attempt(retry_state):
- """
- Utility function to get last retry attempt info based on the
- tenacity version (as debian buster packages version 4.12).
- """
- try:
- attempt = retry_state.outcome
- except AttributeError:
- # tenacity < 5.0
- attempt = retry_state
- return attempt
-
-
def retry_if_exception(retry_state, predicate: Callable[[Exception], bool]) -> bool:
"""
Custom tenacity retry predicate for handling exceptions with the given predicate.
"""
- attempt = retry_attempt(retry_state)
+ attempt = retry_state.outcome
if attempt.failed:
exception = attempt.exception()
return predicate(exception)
return False
def retry_if_throttling(retry_state) -> bool:
"""
Custom tenacity retry predicate for handling HTTP responses with
status code 429 (too many requests).
"""
return retry_if_exception(retry_state, is_throttling_exception)
def retry_policy_generic(retry_state) -> bool:
"""
Custom tenacity retry predicate for handling failed requests:
- ConnectionError
- Server errors (status >= 500)
- Throttling errors (status == 429)
This does not handle 404, 403 or other status codes.
"""
return retry_if_exception(retry_state, is_retryable_exception)
WAIT_EXP_BASE = 10
MAX_NUMBER_ATTEMPTS = 5
def throttling_retry(
retry=retry_if_throttling,
wait=wait_exponential(exp_base=WAIT_EXP_BASE),
stop=stop_after_attempt(max_attempt_number=MAX_NUMBER_ATTEMPTS),
**retry_args,
):
"""
Decorator based on `tenacity` for retrying a function possibly raising
requests.exception.HTTPError for status code 429 (too many requests).
It provides a default configuration that should work properly in most
cases but all `tenacity.retry` parameters can also be overridden in client
code.
When the mmaximum of attempts is reached, the HTTPError exception will then
be reraised.
Args:
retry: function defining request retry condition (default to 429 status code)
https://tenacity.readthedocs.io/en/latest/#whether-to-retry
wait: function defining wait strategy before retrying (default to exponential
backoff) https://tenacity.readthedocs.io/en/latest/#waiting-before-retrying
stop: function defining when to stop retrying (default after 5 attempts)
https://tenacity.readthedocs.io/en/latest/#stopping
"""
return tenacity_retry(retry=retry, wait=wait, stop=stop, reraise=True, **retry_args)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Sep 18, 4:54 PM (1 d, 1 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3272985
Attached To
rDLS Listers
Event Timeline
Log In to Comment