diff --git a/requirements-http.txt b/requirements-http.txt index 1a0bb79..8ca9444 100644 --- a/requirements-http.txt +++ b/requirements-http.txt @@ -1,8 +1,9 @@ # requirements for swh.core.api aiohttp aiohttp_utils >= 3.1.1 blinker # dependency of sentry-sdk[flask] flask iso8601 msgpack >= 1.0.0 requests +tenacity diff --git a/swh/core/github/utils.py b/swh/core/github/utils.py index 8721401..c21c8bb 100644 --- a/swh/core/github/utils.py +++ b/swh/core/github/utils.py @@ -1,48 +1,214 @@ -# Copyright (C) 2022 The Software Heritage developers +# Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging +import random import re -from typing import Optional +import time +from typing import Dict, List, Optional import requests +from tenacity import ( + retry, + retry_any, + retry_if_exception_type, + retry_if_result, + wait_exponential, +) GITHUB_PATTERN = re.compile(r"https?://github.com/(?P.*)") +logger = logging.getLogger(__name__) + + def _url_github_html(user_repo: str) -> str: """Given the user repo, returns the expected github html url.""" return f"https://github.com/{user_repo}" def _url_github_api(user_repo: str) -> str: """Given the user_repo, returns the expected github api url.""" return f"https://api.github.com/repos/{user_repo}" def _sanitize_github_url(url: str) -> str: """Sanitize github url.""" return url.lower().rstrip("/").rstrip(".git").rstrip("/") def get_canonical_github_origin_url(url: str) -> Optional[str]: """Retrieve canonical github url out of an url if any or None otherwise. This triggers an anonymous http request to the github api url to determine the canonical repository url. """ url_ = url.lower() match = GITHUB_PATTERN.match(url_) if not match: return url user_repo = _sanitize_github_url(match.groupdict()["user_repo"]) response = requests.get(_url_github_api(user_repo)) if response.status_code != 200: return None data = response.json() return data["html_url"] + + +class RateLimited(Exception): + def __init__(self, response): + self.reset_time: Optional[int] + + # Figure out how long we need to sleep because of that rate limit + ratelimit_reset = response.headers.get("X-Ratelimit-Reset") + retry_after = response.headers.get("Retry-After") + if ratelimit_reset is not None: + self.reset_time = int(ratelimit_reset) + elif retry_after is not None: + self.reset_time = int(time.time()) + int(retry_after) + 1 + else: + logger.warning( + "Received a rate-limit-like status code %s, but no rate-limit " + "headers set. Response content: %s", + response.status_code, + response.content, + ) + self.reset_time = None + self.response = response + + +class MissingRateLimitReset(Exception): + pass + + +class GitHubSession: + """Manages a :class:`requests.Session` with (optionally) multiple credentials, + and cycles through them when reaching rate-limits.""" + + credentials: Optional[List[Dict[str, str]]] = None + + def __init__( + self, user_agent: str, credentials: Optional[List[Dict[str, str]]] = None + ) -> None: + """Initialize a requests session with the proper headers for requests to + GitHub.""" + if credentials: + creds = credentials.copy() + random.shuffle(creds) + self.credentials = creds + + self.session = requests.Session() + + self.session.headers.update( + {"Accept": "application/vnd.github.v3+json", "User-Agent": user_agent} + ) + + self.anonymous = not self.credentials + + if self.anonymous: + logger.warning("No tokens set in configuration, using anonymous mode") + + self.token_index = -1 + self.current_user: Optional[str] = None + + if not self.anonymous: + # Initialize the first token value in the session headers + self.set_next_session_token() + + def set_next_session_token(self) -> None: + """Update the current authentication token with the next one in line.""" + + assert self.credentials + + self.token_index = (self.token_index + 1) % len(self.credentials) + + auth = self.credentials[self.token_index] + + self.current_user = auth["username"] + logger.debug("Using authentication token for user %s", self.current_user) + + if "password" in auth: + token = auth["password"] + else: + token = auth["token"] + + self.session.headers.update({"Authorization": f"token {token}"}) + + @retry( + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_any( + # ChunkedEncodingErrors happen when the TLS connection gets reset, e.g. + # when running the lister on a connection with high latency + retry_if_exception_type(requests.exceptions.ChunkedEncodingError), + # 502 status codes happen for a Server Error, sometimes + retry_if_result(lambda r: r.status_code == 502), + ), + ) + def _request(self, url: str) -> requests.Response: + response = self.session.get(url) + + if ( + # GitHub returns inconsistent status codes between unauthenticated + # rate limit and authenticated rate limits. Handle both. + response.status_code == 429 + or (self.anonymous and response.status_code == 403) + ): + raise RateLimited(response) + + return response + + def request(self, url) -> requests.Response: + """Repeatedly requests the given URL, cycling through credentials and sleeping + if necessary; until either a successful response or :exc:`MissingRateLimitReset` + """ + # The following for/else loop handles rate limiting; if successful, + # it provides the rest of the function with a `response` object. + # + # If all tokens are rate-limited, we sleep until the reset time, + # then `continue` into another iteration of the outer while loop, + # attempting to get data from the same URL again. + + while True: + max_attempts = len(self.credentials) if self.credentials else 1 + reset_times: Dict[int, int] = {} # token index -> time + for attempt in range(max_attempts): + try: + return self._request(url) + except RateLimited as e: + reset_info = "(unknown reset)" + if e.reset_time is not None: + reset_times[self.token_index] = e.reset_time + reset_info = "(resetting in %ss)" % (e.reset_time - time.time()) + + if not self.anonymous: + logger.info( + "Rate limit exhausted for current user %s %s", + self.current_user, + reset_info, + ) + # Use next token in line + self.set_next_session_token() + # Wait one second to avoid triggering GitHub's abuse rate limits + time.sleep(1) + + # All tokens have been rate-limited. What do we do? + + if not reset_times: + logger.warning( + "No X-Ratelimit-Reset value found in responses for any token; " + "Giving up." + ) + raise MissingRateLimitReset() + + sleep_time = max(reset_times.values()) - time.time() + 1 + logger.info( + "Rate limits exhausted for all tokens. Sleeping for %f seconds.", + sleep_time, + ) + time.sleep(sleep_time) diff --git a/swh/core/tests/test_github_utils.py b/swh/core/tests/test_github_utils.py index 52a4608..6b9397d 100644 --- a/swh/core/tests/test_github_utils.py +++ b/swh/core/tests/test_github_utils.py @@ -1,51 +1,337 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import logging +import time +from typing import Dict, Iterator, List, Optional, Union + import pytest +import requests_mock from swh.core.github.utils import ( + GitHubSession, _sanitize_github_url, _url_github_api, _url_github_html, get_canonical_github_origin_url, ) KNOWN_GH_REPO = "https://github.com/user/repo" @pytest.mark.parametrize( "user_repo, expected_url", [ ("user/repo.git", KNOWN_GH_REPO), ("user/repo.git/", KNOWN_GH_REPO), ("user/repo/", KNOWN_GH_REPO), ("user/repo", KNOWN_GH_REPO), ("user/repo/.git", KNOWN_GH_REPO), # edge cases ("https://github.com/unknown-page", None), # unknown gh origin returns None ("user/repo/with/some/deps", None), # url kind is not dealt with for now ], ) def test_get_canonical_github_origin_url(user_repo, expected_url, requests_mock): """It should return a canonical github origin when it exists, None otherwise""" html_url = _url_github_html(user_repo) api_url = _url_github_api(_sanitize_github_url(user_repo)) if expected_url is not None: status_code = 200 response = {"html_url": _sanitize_github_url(html_url)} else: status_code = 404 response = {} requests_mock.get(api_url, [{"status_code": status_code, "json": response}]) assert get_canonical_github_origin_url(html_url) == expected_url def test_get_canonical_github_origin_url_not_gh_origin(): """It should return the input url when that origin is not a github one""" url = "https://example.org" assert get_canonical_github_origin_url(url) == url + + +def fake_time_sleep(duration: float, sleep_calls: Optional[List[float]] = None): + """Record calls to time.sleep in the sleep_calls list""" + if duration < 0: + raise ValueError("Can't sleep for a negative amount of time!") + if sleep_calls is not None: + sleep_calls.append(duration) + + +def fake_time_time(): + """Return 0 when running time.time()""" + return 0 + + +@pytest.fixture +def monkeypatch_sleep_calls(monkeypatch) -> Iterator[List[float]]: + """Monkeypatch `time.time` and `time.sleep`. Returns a list cumulating the arguments + passed to time.sleep().""" + sleeps: List[float] = [] + monkeypatch.setattr(time, "sleep", lambda d: fake_time_sleep(d, sleeps)) + monkeypatch.setattr(time, "time", fake_time_time) + yield sleeps + + +@pytest.fixture() +def num_before_ratelimit() -> int: + """Number of successful requests before the ratelimit hits""" + return 0 + + +@pytest.fixture() +def num_ratelimit() -> Optional[int]: + """Number of rate-limited requests; None means infinity""" + return None + + +@pytest.fixture() +def ratelimit_reset() -> Optional[int]: + """Value of the X-Ratelimit-Reset header on ratelimited responses""" + return None + + +def github_ratelimit_callback( + request: requests_mock.request._RequestObjectProxy, + context: requests_mock.response._Context, + ratelimit_reset: Optional[int], +) -> Dict[str, str]: + """Return a rate-limited GitHub API response.""" + # Check request headers + assert request.headers["Accept"] == "application/vnd.github.v3+json" + assert request.headers["User-Agent"] is not None + if "Authorization" in request.headers: + context.status_code = 429 + else: + context.status_code = 403 + + if ratelimit_reset is not None: + context.headers["X-Ratelimit-Reset"] = str(ratelimit_reset) + + return { + "message": "API rate limit exceeded for .", + "documentation_url": "https://developer.github.com/v3/#rate-limiting", + } + + +def github_repo(i: int) -> Dict[str, Union[int, str]]: + """Basic repository information returned by the GitHub API""" + + repo: Dict[str, Union[int, str]] = { + "id": i, + "html_url": f"https://github.com/origin/{i}", + } + + # Set the pushed_at date on one of the origins + if i == 4321: + repo["pushed_at"] = "2018-11-08T13:16:24Z" + + return repo + + +HTTP_GH_API_URL = "https://api.github.com/repositories" + + +def github_response_callback( + request: requests_mock.request._RequestObjectProxy, + context: requests_mock.response._Context, + page_size: int = 1000, + origin_count: int = 10000, +) -> List[Dict[str, Union[str, int]]]: + """Return minimal GitHub API responses for the common case where the loader + hasn't been rate-limited""" + # Check request headers + assert request.headers["Accept"] == "application/vnd.github.v3+json" + assert request.headers["User-Agent"] is not None + + # Check request parameters: per_page == 1000, since = last_repo_id + assert "per_page" in request.qs + assert request.qs["per_page"] == [str(page_size)] + assert "since" in request.qs + + since = int(request.qs["since"][0]) + + next_page = since + page_size + if next_page < origin_count: + # the first id for the next page is within our origin count; add a Link + # header to the response + next_url = f"{HTTP_GH_API_URL}?per_page={page_size}&since={next_page}" + context.headers["Link"] = f"<{next_url}>; rel=next" + + return [github_repo(i) for i in range(since + 1, min(next_page, origin_count) + 1)] + + +@pytest.fixture() +def requests_ratelimited( + num_before_ratelimit: int, + num_ratelimit: Optional[int], + ratelimit_reset: Optional[int], +) -> Iterator[requests_mock.Mocker]: + """Mock requests to the GitHub API, returning a rate-limiting status code + after `num_before_ratelimit` requests. + + GitHub does inconsistent rate-limiting: + - Anonymous requests return a 403 status code + - Authenticated requests return a 429 status code, with an + X-Ratelimit-Reset header. + + This fixture takes multiple arguments (which can be overridden with a + :func:`pytest.mark.parametrize` parameter): + - num_before_ratelimit: the global number of requests until the + ratelimit triggers + - num_ratelimit: the number of requests that return a + rate-limited response. + - ratelimit_reset: the timestamp returned in X-Ratelimit-Reset if the + request is authenticated. + + The default values set in the previous fixtures make all requests return a rate + limit response. + """ + current_request = 0 + + def response_callback(request, context): + nonlocal current_request + current_request += 1 + if num_before_ratelimit < current_request and ( + num_ratelimit is None + or current_request < num_before_ratelimit + num_ratelimit + 1 + ): + return github_ratelimit_callback(request, context, ratelimit_reset) + else: + return github_response_callback(request, context) + + with requests_mock.Mocker() as mock: + mock.get(HTTP_GH_API_URL, json=response_callback) + yield mock + + +@pytest.fixture +def github_credentials() -> List[Dict[str, str]]: + """Return a static list of GitHub credentials""" + return sorted( + [{"username": f"swh{i:d}", "token": f"token-{i:d}"} for i in range(3)] + + [ + {"username": f"swh-legacy{i:d}", "password": f"token-legacy-{i:d}"} + for i in range(3) + ], + key=lambda c: c["username"], + ) + + +@pytest.fixture +def all_tokens(github_credentials) -> List[str]: + """Return the list of tokens matching the static credential""" + + return [t.get("token", t.get("password")) for t in github_credentials] + + +def test_github_session_anonymous_session(): + user_agent = ("GitHub Session Test",) + github_session = GitHubSession( + user_agent=user_agent, + ) + assert github_session.anonymous is True + + actual_headers = github_session.session.headers + assert actual_headers["Accept"] == "application/vnd.github.v3+json" + assert actual_headers["User-Agent"] == user_agent + + +@pytest.mark.parametrize( + "num_ratelimit", [1] # return a single rate-limit response, then continue +) +def test_github_session_ratelimit_once_recovery( + caplog, + requests_ratelimited, + num_ratelimit, + monkeypatch_sleep_calls, + github_credentials, +): + """GitHubSession should recover from hitting the rate-limit once""" + caplog.set_level(logging.DEBUG, "swh.core.github.utils") + + github_session = GitHubSession( + user_agent="GitHub Session Test", credentials=github_credentials + ) + + res = github_session.request(f"{HTTP_GH_API_URL}?per_page=1000&since=10") + assert res.status_code == 200 + + token_users = [] + for record in caplog.records: + if "Using authentication token" in record.message: + token_users.append(record.args[0]) + + # check that we used one more token than we saw rate limited requests + assert len(token_users) == 1 + num_ratelimit + + # check that we slept for one second between our token uses + assert monkeypatch_sleep_calls == [1] + + +def test_github_session_authenticated_credentials( + caplog, github_credentials, all_tokens +): + """GitHubSession should have Authorization headers set in authenticated mode""" + caplog.set_level(logging.DEBUG, "swh.core.github.utils") + + github_session = GitHubSession( + "GitHub Session Test", credentials=github_credentials + ) + + assert github_session.anonymous is False + assert github_session.token_index == 0 + assert ( + sorted(github_session.credentials, key=lambda t: t["username"]) + == github_credentials + ) + assert github_session.session.headers["Authorization"] in [ + f"token {t}" for t in all_tokens + ] + + +@pytest.mark.parametrize( + # Do 5 successful requests, return 6 ratelimits (to exhaust the credentials) with a + # set value for X-Ratelimit-Reset, then resume listing successfully. + "num_before_ratelimit, num_ratelimit, ratelimit_reset", + [(5, 6, 123456)], +) +def test_github_session_ratelimit_reset_sleep( + caplog, + requests_ratelimited, + monkeypatch_sleep_calls, + num_before_ratelimit, + num_ratelimit, + ratelimit_reset, + github_credentials, +): + """GitHubSession should handle rate-limit with authentication tokens.""" + caplog.set_level(logging.DEBUG, "swh.core.github.utils") + + github_session = GitHubSession( + user_agent="GitHub Session Test", credentials=github_credentials + ) + + for _ in range(num_ratelimit): + github_session.request(f"{HTTP_GH_API_URL}?per_page=1000&since=10") + + # We sleep 1 second every time we change credentials, then we sleep until + # ratelimit_reset + 1 + expected_sleep_calls = len(github_credentials) * [1] + [ratelimit_reset + 1] + assert monkeypatch_sleep_calls == expected_sleep_calls + + found_exhaustion_message = False + for record in caplog.records: + if record.levelname == "INFO": + if "Rate limits exhausted for all tokens" in record.message: + found_exhaustion_message = True + break + + assert found_exhaustion_message is True