diff --git a/swh/core/github/pytest_plugin.py b/swh/core/github/pytest_plugin.py index 20c5e80..a38160a 100644 --- a/swh/core/github/pytest_plugin.py +++ b/swh/core/github/pytest_plugin.py @@ -1,184 +1,202 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time from typing import Dict, Iterator, List, Optional, Union import pytest import requests_mock HTTP_GITHUB_API_URL = "https://api.github.com/repositories" def fake_time_sleep(duration: float, sleep_calls: Optional[List[float]] = None): """Record calls to time.sleep in the sleep_calls list.""" if duration < 0: raise ValueError("Can't sleep for a negative amount of time!") if sleep_calls is not None: sleep_calls.append(duration) def fake_time_time(): """Return 0 when running time.time()""" return 0 @pytest.fixture def monkeypatch_sleep_calls(monkeypatch) -> Iterator[List[float]]: """Monkeypatch `time.time` and `time.sleep`. Returns a list cumulating the arguments passed to time.sleep().""" sleeps: List[float] = [] monkeypatch.setattr(time, "sleep", lambda d: fake_time_sleep(d, sleeps)) monkeypatch.setattr(time, "time", fake_time_time) yield sleeps @pytest.fixture() def num_before_ratelimit() -> int: """Number of successful requests before the ratelimit hits""" return 0 @pytest.fixture() def num_ratelimit() -> Optional[int]: """Number of rate-limited requests; None means infinity""" return None @pytest.fixture() def ratelimit_reset() -> Optional[int]: """Value of the X-Ratelimit-Reset header on ratelimited responses""" return None def github_ratelimit_callback( request: requests_mock.request._RequestObjectProxy, context: requests_mock.response._Context, + remaining_requests: int, ratelimit_reset: Optional[int], ) -> Dict[str, str]: """Return a rate-limited GitHub API response.""" # Check request headers assert request.headers["Accept"] == "application/vnd.github.v3+json" assert request.headers["User-Agent"] is not None if "Authorization" in request.headers: context.status_code = 429 else: context.status_code = 403 if ratelimit_reset is not None: context.headers["X-Ratelimit-Reset"] = str(ratelimit_reset) + context.headers["X-Ratelimit-Remaining"] = str(remaining_requests) return { "message": "API rate limit exceeded for .", "documentation_url": "https://developer.github.com/v3/#rate-limiting", } def github_repo(i: int) -> Dict[str, Union[int, str]]: """Basic repository information returned by the GitHub API""" repo: Dict[str, Union[int, str]] = { "id": i, "html_url": f"https://github.com/origin/{i}", } # Set the pushed_at date on one of the origins if i == 4321: repo["pushed_at"] = "2018-11-08T13:16:24Z" return repo def github_response_callback( request: requests_mock.request._RequestObjectProxy, context: requests_mock.response._Context, + remaining_requests: int, page_size: int = 1000, origin_count: int = 10000, ) -> List[Dict[str, Union[str, int]]]: """Return minimal GitHub API responses for the common case where the loader hasn't been rate-limited""" # Check request headers assert request.headers["Accept"] == "application/vnd.github.v3+json" assert request.headers["User-Agent"] is not None # Check request parameters: per_page == 1000, since = last_repo_id assert "per_page" in request.qs assert request.qs["per_page"] == [str(page_size)] assert "since" in request.qs since = int(request.qs["since"][0]) next_page = since + page_size if next_page < origin_count: # the first id for the next page is within our origin count; add a Link # header to the response next_url = f"{HTTP_GITHUB_API_URL}?per_page={page_size}&since={next_page}" context.headers["Link"] = f"<{next_url}>; rel=next" + context.headers["X-Ratelimit-Remaining"] = str(remaining_requests) return [github_repo(i) for i in range(since + 1, min(next_page, origin_count) + 1)] @pytest.fixture() -def requests_ratelimited( +def github_requests_ratelimited( num_before_ratelimit: int, num_ratelimit: Optional[int], ratelimit_reset: Optional[int], ) -> Iterator[requests_mock.Mocker]: """Mock requests to the GitHub API, returning a rate-limiting status code after `num_before_ratelimit` requests. GitHub does inconsistent rate-limiting: - Anonymous requests return a 403 status code - Authenticated requests return a 429 status code, with an X-Ratelimit-Reset header. This fixture takes multiple arguments (which can be overridden with a :func:`pytest.mark.parametrize` parameter): - num_before_ratelimit: the global number of requests until the ratelimit triggers - num_ratelimit: the number of requests that return a rate-limited response. - ratelimit_reset: the timestamp returned in X-Ratelimit-Reset if the request is authenticated. The default values set in the previous fixtures make all requests return a rate limit response. """ current_request = 0 def response_callback(request, context): nonlocal current_request current_request += 1 - if num_before_ratelimit < current_request and ( - num_ratelimit is None - or current_request < num_before_ratelimit + num_ratelimit + 1 + if num_before_ratelimit >= current_request: + # case 1: not yet rate-limited + return github_response_callback( + request, context, (num_before_ratelimit or 1000) - current_request + ) + elif ( + num_ratelimit is not None + and current_request >= num_before_ratelimit + num_ratelimit + 1 ): - return github_ratelimit_callback(request, context, ratelimit_reset) + # case 3: no longer rate-limited + return github_response_callback( + request, context, (num_before_ratelimit + 1000) - current_request + ) else: - return github_response_callback(request, context) + # case 2: being rate-limited + return github_ratelimit_callback( + request, + context, + max(0, (num_before_ratelimit or 1000) - current_request), + ratelimit_reset, + ) with requests_mock.Mocker() as mock: mock.get(HTTP_GITHUB_API_URL, json=response_callback) yield mock @pytest.fixture def github_credentials() -> List[Dict[str, str]]: """Return a static list of GitHub credentials""" return sorted( [{"username": f"swh{i:d}", "token": f"token-{i:d}"} for i in range(3)] + [ {"username": f"swh-legacy{i:d}", "password": f"token-legacy-{i:d}"} for i in range(3) ], key=lambda c: c["username"], ) @pytest.fixture def all_tokens(github_credentials) -> List[str]: """Return the list of tokens matching the static credential""" return [t.get("token", t.get("password")) for t in github_credentials] diff --git a/swh/core/github/tests/test_github_utils.py b/swh/core/github/tests/test_github_utils.py index d3978f7..2db2de3 100644 --- a/swh/core/github/tests/test_github_utils.py +++ b/swh/core/github/tests/test_github_utils.py @@ -1,211 +1,361 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import itertools import logging +from unittest.mock import call import pytest from swh.core.github.pytest_plugin import HTTP_GITHUB_API_URL from swh.core.github.utils import ( GitHubSession, _sanitize_github_url, _url_github_api, get_canonical_github_origin_url, ) KNOWN_GH_REPO = "https://github.com/user/repo" KNOWN_GH_REPO2 = "https://github.com/user/reposit" @pytest.mark.parametrize( "user_repo, expected_url", [ ("user/repo.git", KNOWN_GH_REPO), ("user/repo.git/", KNOWN_GH_REPO), ("user/repo/", KNOWN_GH_REPO), ("user/repo", KNOWN_GH_REPO), ("user/repo/.git", KNOWN_GH_REPO), ("user/reposit.git", KNOWN_GH_REPO2), ("user/reposit.git/", KNOWN_GH_REPO2), ("user/reposit/", KNOWN_GH_REPO2), ("user/reposit", KNOWN_GH_REPO2), ("user/reposit/.git", KNOWN_GH_REPO2), ("unknown/page", None), # unknown gh origin returns None ("user/with/deps", None), # url kind is not dealt with ], ) def test_get_canonical_github_origin_url( user_repo, expected_url, requests_mock, github_credentials ): """It should return a canonical github origin when it exists, None otherwise""" for separator in ["/", ":"]: for prefix in [ "http://", "https://", "git://", "ssh://", "//", "git@", "ssh://git@", "https://${env.GITHUB_TOKEN_USR}:${env.GITHUB_TOKEN_PSW}@", "[fetch=]git@", ]: html_input_url = f"{prefix}github.com{separator}{user_repo}" html_url = f"https://github.com/{user_repo}" api_url = _url_github_api(_sanitize_github_url(user_repo)) if expected_url is not None: status_code = 200 response = {"html_url": _sanitize_github_url(html_url)} else: status_code = 404 response = {} requests_mock.get(api_url, [{"status_code": status_code, "json": response}]) # anonymous assert get_canonical_github_origin_url(html_input_url) == expected_url # with credentials assert ( get_canonical_github_origin_url( html_input_url, credentials=github_credentials ) == expected_url ) # anonymous assert ( GitHubSession( user_agent="GitHub Session Test", ).get_canonical_url(html_input_url) == expected_url ) # with credentials assert ( GitHubSession( user_agent="GitHub Session Test", credentials=github_credentials ).get_canonical_url(html_input_url) == expected_url ) def test_get_canonical_github_origin_url_not_gh_origin(): """It should return the input url when that origin is not a github one""" url = "https://example.org" assert get_canonical_github_origin_url(url) == url assert ( GitHubSession( user_agent="GitHub Session Test", ).get_canonical_url(url) == url ) def test_github_session_anonymous_session(): user_agent = ("GitHub Session Test",) github_session = GitHubSession( user_agent=user_agent, ) assert github_session.anonymous is True actual_headers = github_session.session.headers assert actual_headers["Accept"] == "application/vnd.github.v3+json" assert actual_headers["User-Agent"] == user_agent @pytest.mark.parametrize( "num_ratelimit", [1] # return a single rate-limit response, then continue ) def test_github_session_ratelimit_once_recovery( caplog, - requests_ratelimited, + mocker, + github_requests_ratelimited, num_ratelimit, monkeypatch_sleep_calls, github_credentials, ): """GitHubSession should recover from hitting the rate-limit once""" caplog.set_level(logging.DEBUG, "swh.core.github.utils") github_session = GitHubSession( user_agent="GitHub Session Test", credentials=github_credentials ) + statsd_report = mocker.patch.object(github_session.statsd, "_report") + res = github_session.request(f"{HTTP_GITHUB_API_URL}?per_page=1000&since=10") assert res.status_code == 200 token_users = [] for record in caplog.records: if "Using authentication token" in record.message: token_users.append(record.args[0]) # check that we used one more token than we saw rate limited requests assert len(token_users) == 1 + num_ratelimit # check that we slept for one second between our token uses assert monkeypatch_sleep_calls == [1] + username0 = github_session.credentials[0]["username"] + username1 = github_session.credentials[1]["username"] + tags0 = {"username": username0, "http_status": 429} + tags1 = {"username": username1, "http_status": 200} + assert [c for c in statsd_report.mock_calls] == [ + call("requests_total", "c", 1, {"username": username0}, 1), + call("responses_total", "c", 1, tags0, 1), + call("remaining_requests", "g", 999, {"username": username0}, 1), + call("rate_limited_responses_total", "c", 1, {"username": username0}, 1), + call("sleep", "c", 1, None, 1), + call("requests_total", "c", 1, {"username": username1}, 1), + call("responses_total", "c", 1, tags1, 1), + call("remaining_requests", "g", 998, {"username": username1}, 1), + ] + assert github_session.statsd.constant_tags == { + "api_type": "github", + "api_instance": "github", + } + def test_github_session_authenticated_credentials( caplog, github_credentials, all_tokens ): """GitHubSession should have Authorization headers set in authenticated mode""" caplog.set_level(logging.DEBUG, "swh.core.github.utils") github_session = GitHubSession( "GitHub Session Test", credentials=github_credentials ) assert github_session.anonymous is False assert github_session.token_index == 0 assert ( sorted(github_session.credentials, key=lambda t: t["username"]) == github_credentials ) assert github_session.session.headers["Authorization"] in [ f"token {t}" for t in all_tokens ] @pytest.mark.parametrize( # Do 5 successful requests, return 6 ratelimits (to exhaust the credentials) with a # set value for X-Ratelimit-Reset, then resume listing successfully. "num_before_ratelimit, num_ratelimit, ratelimit_reset", [(5, 6, 123456)], ) def test_github_session_ratelimit_reset_sleep( caplog, - requests_ratelimited, + mocker, + github_requests_ratelimited, monkeypatch_sleep_calls, num_before_ratelimit, num_ratelimit, ratelimit_reset, github_credentials, ): """GitHubSession should handle rate-limit with authentication tokens.""" caplog.set_level(logging.DEBUG, "swh.core.github.utils") github_session = GitHubSession( user_agent="GitHub Session Test", credentials=github_credentials ) + statsd_report = mocker.patch.object(github_session.statsd, "_report") + for _ in range(num_ratelimit): github_session.request(f"{HTTP_GITHUB_API_URL}?per_page=1000&since=10") # We sleep 1 second every time we change credentials, then we sleep until # ratelimit_reset + 1 expected_sleep_calls = len(github_credentials) * [1] + [ratelimit_reset + 1] assert monkeypatch_sleep_calls == expected_sleep_calls found_exhaustion_message = False for record in caplog.records: if record.levelname == "INFO": if "Rate limits exhausted for all tokens" in record.message: found_exhaustion_message = True break assert found_exhaustion_message is True + + username0 = github_session.credentials[0]["username"] + + def ok_request_calls(user, remaining): + return [ + call("requests_total", "c", 1, {"username": user}, 1), + call("responses_total", "c", 1, {"username": user, "http_status": 200}, 1), + call("remaining_requests", "g", remaining, {"username": user}, 1), + ] + + def ratelimited_request_calls(user): + return [ + call("requests_total", "c", 1, {"username": user}, 1), + call("responses_total", "c", 1, {"username": user, "http_status": 429}, 1), + call("remaining_requests", "g", 0, {"username": user}, 1), + call("reset_seconds", "g", ratelimit_reset, {"username": user}, 1), + call("rate_limited_responses_total", "c", 1, {"username": user}, 1), + call("sleep", "c", 1, None, 1), + ] + + expected_calls_groups = ( + # Successful requests + [ok_request_calls(username0, n - 1) for n in range(num_before_ratelimit, 0, -1)] + # Then rate-limited failures, cycling through tokens + + [ + ratelimited_request_calls( + github_session.credentials[n % len(github_credentials)]["username"] + ) + for n in range(num_ratelimit) + ] + # And finally, a long sleep and the successful request + + [ + [call("sleep", "c", ratelimit_reset + 1, None, 1)], + ok_request_calls( + github_session.credentials[num_ratelimit % len(github_credentials)][ + "username" + ], + 1000 - num_ratelimit - 1, + ), + ] + ) + expected_calls = list(itertools.chain.from_iterable(expected_calls_groups)) + assert [c for c in statsd_report.mock_calls] == expected_calls + assert github_session.statsd.constant_tags == { + "api_type": "github", + "api_instance": "github", + } + + +# Same as before, but with no credentials +@pytest.mark.parametrize( + "num_before_ratelimit, num_ratelimit, ratelimit_reset", + [(5, 6, 123456)], +) +def test_github_session_ratelimit_reset_sleep_anonymous( + caplog, + mocker, + github_requests_ratelimited, + monkeypatch_sleep_calls, + num_before_ratelimit, + num_ratelimit, + ratelimit_reset, +): + """GitHubSession should handle rate-limit with authentication tokens.""" + caplog.set_level(logging.DEBUG, "swh.core.github.utils") + + github_session = GitHubSession(user_agent="GitHub Session Test") + + statsd_report = mocker.patch.object(github_session.statsd, "_report") + + for _ in range(num_ratelimit): + github_session.request(f"{HTTP_GITHUB_API_URL}?per_page=1000&since=10") + + # No credentials, so we immediately sleep for a long time + expected_sleep_calls = [ratelimit_reset + 1] * num_ratelimit + assert monkeypatch_sleep_calls == expected_sleep_calls + + found_exhaustion_message = False + for record in caplog.records: + if record.levelname == "INFO": + if "Rate limits exhausted for all tokens" in record.message: + found_exhaustion_message = True + break + + assert found_exhaustion_message is True + + user = "anonymous" + + def ok_request_calls(remaining): + return [ + call("requests_total", "c", 1, {"username": user}, 1), + call("responses_total", "c", 1, {"username": user, "http_status": 200}, 1), + call("remaining_requests", "g", remaining, {"username": user}, 1), + ] + + def ratelimited_request_calls(): + return [ + call("requests_total", "c", 1, {"username": user}, 1), + call("responses_total", "c", 1, {"username": user, "http_status": 403}, 1), + call("remaining_requests", "g", 0, {"username": user}, 1), + call("reset_seconds", "g", ratelimit_reset, {"username": user}, 1), + call("rate_limited_responses_total", "c", 1, {"username": user}, 1), + call("sleep", "c", ratelimit_reset + 1, None, 1), + ] + + expected_calls_groups = ( + # Successful requests + [ok_request_calls(n - 1) for n in range(num_before_ratelimit, 0, -1)] + # Then rate-limited failures, each with a long sleep + + [ratelimited_request_calls() for n in range(num_ratelimit)] + # And finally, the successful request + + [ + ok_request_calls( + 1000 - num_ratelimit - 1, + ), + ] + ) + expected_calls = list(itertools.chain.from_iterable(expected_calls_groups)) + assert [c for c in statsd_report.mock_calls] == expected_calls + assert github_session.statsd.constant_tags == { + "api_type": "github", + "api_instance": "github", + } diff --git a/swh/core/github/utils.py b/swh/core/github/utils.py index f10ed34..5f99b29 100644 --- a/swh/core/github/utils.py +++ b/swh/core/github/utils.py @@ -1,232 +1,275 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import random import re import time from typing import Dict, List, Optional import requests from tenacity import ( retry, retry_any, retry_if_exception_type, retry_if_result, wait_exponential, ) +from ..statsd import Statsd + GITHUB_PATTERN = re.compile( r"(//|git://|git@|git//|https?://|ssh://|.*@)github.com[/:](?P.*)" ) logger = logging.getLogger(__name__) def _url_github_api(user_repo: str) -> str: """Given the user_repo, returns the expected github api url.""" return f"https://api.github.com/repos/{user_repo}" _SANITIZATION_RE = re.compile(r"^(.*?)/?(\.git)?/?$") def _sanitize_github_url(url: str) -> str: """Sanitize github url.""" m = _SANITIZATION_RE.match(url.lower()) assert m is not None, url # impossible, but mypy doesn't know it return m.group(1) def get_canonical_github_origin_url( url: str, credentials: Optional[List[Dict[str, str]]] = None ) -> Optional[str]: """Retrieve canonical github url out of an url if any or None otherwise. This triggers an http request to the github api url to determine the canonical repository url (if no credentials is provided, the http request is anonymous. Either way that request can be rate-limited by github.) """ return GitHubSession( user_agent="SWH core library", credentials=credentials ).get_canonical_url(url) class RateLimited(Exception): def __init__(self, response): self.reset_time: Optional[int] # Figure out how long we need to sleep because of that rate limit ratelimit_reset = response.headers.get("X-Ratelimit-Reset") retry_after = response.headers.get("Retry-After") if ratelimit_reset is not None: self.reset_time = int(ratelimit_reset) elif retry_after is not None: self.reset_time = int(time.time()) + int(retry_after) + 1 else: logger.warning( "Received a rate-limit-like status code %s, but no rate-limit " "headers set. Response content: %s", response.status_code, response.content, ) self.reset_time = None self.response = response class MissingRateLimitReset(Exception): pass class GitHubSession: """Manages a :class:`requests.Session` with (optionally) multiple credentials, and cycles through them when reaching rate-limits.""" credentials: Optional[List[Dict[str, str]]] = None def __init__( self, user_agent: str, credentials: Optional[List[Dict[str, str]]] = None ) -> None: """Initialize a requests session with the proper headers for requests to GitHub.""" if credentials: creds = credentials.copy() random.shuffle(creds) self.credentials = creds + self.statsd = Statsd( + namespace="swh_outbound_api", + constant_tags={"api_type": "github", "api_instance": "github"}, + ) + self.session = requests.Session() self.session.headers.update( {"Accept": "application/vnd.github.v3+json", "User-Agent": user_agent} ) self.anonymous = not self.credentials if self.anonymous: logger.warning("No tokens set in configuration, using anonymous mode") self.token_index = -1 self.current_user: Optional[str] = None if not self.anonymous: # Initialize the first token value in the session headers self.set_next_session_token() def set_next_session_token(self) -> None: """Update the current authentication token with the next one in line.""" assert self.credentials self.token_index = (self.token_index + 1) % len(self.credentials) auth = self.credentials[self.token_index] self.current_user = auth["username"] logger.debug("Using authentication token for user %s", self.current_user) if "password" in auth: token = auth["password"] else: token = auth["token"] self.session.headers.update({"Authorization": f"token {token}"}) @retry( wait=wait_exponential(multiplier=1, min=4, max=10), retry=retry_any( # ChunkedEncodingErrors happen when the TLS connection gets reset, e.g. # when running the lister on a connection with high latency retry_if_exception_type(requests.exceptions.ChunkedEncodingError), # 502 status codes happen for a Server Error, sometimes retry_if_result(lambda r: r.status_code == 502), ), ) def _request(self, url: str) -> requests.Response: + # When anonymous, rate-limits are per-IP; but we cannot necessarily + # get the IP/hostname here as we may be containerized. Instead, we rely on + # statsd-exporter adding the hostname in the 'instance' tag. + tags = {"username": self.current_user or "anonymous"} + + self.statsd.increment("requests_total", tags=tags) + response = self.session.get(url) + # self.session.get(url) raises in case of non-HTTP error (DNS, TCP, TLS, ...), + # so responses_total may differ from requests_total. + self.statsd.increment( + "responses_total", tags={**tags, "http_status": response.status_code} + ) + + try: + ratelimit_remaining = int(response.headers["x-ratelimit-remaining"]) + except (KeyError, ValueError): + logger.warning( + "Invalid x-ratelimit-remaining header from GitHub: %r", + response.headers.get("x-ratelimit-remaining"), + ) + else: + self.statsd.gauge("remaining_requests", ratelimit_remaining, tags=tags) + + try: + reset_seconds = int(response.headers["x-ratelimit-reset"]) - time.time() + except (KeyError, ValueError): + logger.warning( + "Invalid x-ratelimit-reset header from GitHub: %r", + response.headers.get("x-ratelimit-reset"), + ) + else: + self.statsd.gauge("reset_seconds", reset_seconds, tags=tags) + if ( # GitHub returns inconsistent status codes between unauthenticated # rate limit and authenticated rate limits. Handle both. response.status_code == 429 or (self.anonymous and response.status_code == 403) ): + self.statsd.increment("rate_limited_responses_total", tags=tags) raise RateLimited(response) return response def request(self, url) -> requests.Response: """Repeatedly requests the given URL, cycling through credentials and sleeping if necessary; until either a successful response or :exc:`MissingRateLimitReset` """ # The following for/else loop handles rate limiting; if successful, # it provides the rest of the function with a `response` object. # # If all tokens are rate-limited, we sleep until the reset time, # then `continue` into another iteration of the outer while loop, # attempting to get data from the same URL again. while True: max_attempts = len(self.credentials) if self.credentials else 1 reset_times: Dict[int, int] = {} # token index -> time for attempt in range(max_attempts): try: return self._request(url) except RateLimited as e: reset_info = "(unknown reset)" if e.reset_time is not None: reset_times[self.token_index] = e.reset_time reset_info = "(resetting in %ss)" % (e.reset_time - time.time()) if not self.anonymous: logger.info( "Rate limit exhausted for current user %s %s", self.current_user, reset_info, ) # Use next token in line self.set_next_session_token() # Wait one second to avoid triggering GitHub's abuse rate limits + self.statsd.increment("sleep", 1) time.sleep(1) # All tokens have been rate-limited. What do we do? if not reset_times: logger.warning( "No X-Ratelimit-Reset value found in responses for any token; " "Giving up." ) raise MissingRateLimitReset() sleep_time = max(reset_times.values()) - time.time() + 1 logger.info( "Rate limits exhausted for all tokens. Sleeping for %f seconds.", sleep_time, ) + self.statsd.increment("sleep", sleep_time) time.sleep(sleep_time) def get_canonical_url(self, url: str) -> Optional[str]: """Retrieve canonical github url out of an url if any or None otherwise. This triggers an http request to the github api url to determine the canonical repository url. Returns The canonical url if any, None otherwise. """ url_ = url.lower() match = GITHUB_PATTERN.match(url_) if not match: return url user_repo = _sanitize_github_url(match.groupdict()["user_repo"]) response = self.request(_url_github_api(user_repo)) if response.status_code != 200: return None data = response.json() return data["html_url"] diff --git a/swh/core/statsd.py b/swh/core/statsd.py index b366d59..e841b7e 100644 --- a/swh/core/statsd.py +++ b/swh/core/statsd.py @@ -1,500 +1,500 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Initially imported from https://github.com/DataDog/datadogpy/ # at revision 62b3a3e89988dc18d78c282fe3ff5d1813917436 # # Copyright (c) 2015, Datadog # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of Datadog nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # # Vastly adapted for integration in swh.core: # # - Removed python < 3.5 compat code # - trimmed the imports down to be a single module # - adjust some options: # - drop unix socket connection option # - add environment variable support for setting the statsd host and # port (pulled the idea from the main python statsd module) # - only send timer metrics in milliseconds (that's what # prometheus-statsd-exporter expects) # - drop DataDog-specific metric types (that are unsupported in # prometheus-statsd-exporter) # - made the tags a dict instead of a list (prometheus-statsd-exporter only # supports tags with a value, mirroring prometheus) # - switch from time.time to time.monotonic # - improve unit test coverage # - documentation cleanup from asyncio import iscoroutinefunction from contextlib import contextmanager from functools import wraps import itertools import logging import os from random import random import re import socket import threading from time import monotonic from typing import Collection, Dict, Optional import warnings log = logging.getLogger("swh.core.statsd") class TimedContextManagerDecorator(object): """ A context manager and a decorator which will report the elapsed time in the context OR in a function call. Attributes: elapsed (float): the elapsed time at the point of completion """ def __init__( self, statsd, metric=None, error_metric=None, tags=None, sample_rate=1 ): self.statsd = statsd self.metric = metric self.error_metric = error_metric self.tags = tags or {} self.sample_rate = sample_rate self.elapsed = None # this is for testing purpose def __call__(self, func): """ Decorator which returns the elapsed time of the function call. Default to the function name if metric was not provided. """ if not self.metric: self.metric = "%s.%s" % (func.__module__, func.__name__) # Coroutines if iscoroutinefunction(func): @wraps(func) async def wrapped_co(*args, **kwargs): start = monotonic() try: result = await func(*args, **kwargs) except BaseException as e: self._send_error(error_type=type(e).__name__) raise self._send(start) return result return wrapped_co # Others @wraps(func) def wrapped(*args, **kwargs): start = monotonic() try: result = func(*args, **kwargs) except BaseException as e: self._send_error(error_type=type(e).__name__) raise self._send(start) return result return wrapped def __enter__(self): if not self.metric: raise TypeError("Cannot used timed without a metric!") self._start = monotonic() return self def __exit__(self, type, value, traceback): # Report the elapsed time of the context manager if no error. if type is None: self._send(self._start) else: self._send_error(error_type=type.__name__) def _send(self, start): elapsed = (monotonic() - start) * 1000 self.statsd.timing( self.metric, elapsed, tags=self.tags, sample_rate=self.sample_rate ) self.elapsed = elapsed def _send_error(self, error_type=None): if self.error_metric is None: self.error_metric = self.metric + "_error_count" if error_type is not None: tags = {**self.tags, "error_type": error_type} else: tags = self.tags self.statsd.increment(self.error_metric, tags=tags) def start(self): """Start the timer""" self.__enter__() def stop(self): """Stop the timer, send the metric value""" self.__exit__(None, None, None) class Statsd(object): """Initialize a client to send metrics to a StatsD server. Arguments: host (str): the host of the StatsD server. Defaults to localhost. port (int): the port of the StatsD server. Defaults to 8125. max_buffer_size (int): Maximum number of metrics to buffer before sending to the server if sending metrics in batch namespace (str): Namespace to prefix all metric names constant_tags (Dict[str, str]): Tags to attach to all metrics Note: This class also supports the following environment variables: STATSD_HOST Override the default host of the statsd server STATSD_PORT Override the default port of the statsd server STATSD_TAGS Tags to attach to every metric reported. Example value: "label:value,other_label:other_value" """ def __init__( self, host=None, port=None, max_buffer_size=50, namespace=None, constant_tags=None, ): # Connection if host is None: host = os.environ.get("STATSD_HOST") or "localhost" self.host = host if port is None: port = os.environ.get("STATSD_PORT") or 8125 self.port = int(port) # Socket self._socket = None self.lock = threading.Lock() self.max_buffer_size = max_buffer_size self._send = self._send_to_server self.encoding = "utf-8" # Tags self.constant_tags = {} tags_envvar = os.environ.get("STATSD_TAGS", "") for tag in tags_envvar.split(","): if not tag: continue if ":" not in tag: warnings.warn( - "STATSD_TAGS needs to be in key:value format, " "%s invalid" % tag, + f"STATSD_TAGS needs to be in 'key:value' format, not {tag!r}", UserWarning, ) continue k, v = tag.split(":", 1) # look for a possible env var substitution, using $NAME or ${NAME} format m = re.match(r"^[$]([{])?(?P\w+)(?(1)[}]|)$", v) if m: envvar = m.group("envvar") if envvar in os.environ: v = os.environ[envvar] self.constant_tags[k] = v if constant_tags: self.constant_tags.update( {str(k): str(v) for k, v in constant_tags.items()} ) # Namespace if namespace is not None: namespace = str(namespace) self.namespace = namespace def __enter__(self): self.open_buffer(self.max_buffer_size) return self def __exit__(self, type, value, traceback): self.close_buffer() def gauge(self, metric, value, tags=None, sample_rate=1): """ Record the value of a gauge, optionally setting a list of tags and a sample rate. >>> statsd.gauge('users.online', 123) >>> statsd.gauge('active.connections', 1001, tags={"protocol": "http"}) """ return self._report(metric, "g", value, tags, sample_rate) def increment(self, metric, value=1, tags=None, sample_rate=1): """ Increment a counter, optionally setting a value, tags and a sample rate. >>> statsd.increment('page.views') >>> statsd.increment('files.transferred', 124) """ self._report(metric, "c", value, tags, sample_rate) def decrement(self, metric, value=1, tags=None, sample_rate=1): """ Decrement a counter, optionally setting a value, tags and a sample rate. >>> statsd.decrement('files.remaining') >>> statsd.decrement('active.connections', 2) """ metric_value = -value if value else value self._report(metric, "c", metric_value, tags, sample_rate) def histogram(self, metric, value, tags=None, sample_rate=1): """ Sample a histogram value, optionally setting tags and a sample rate. >>> statsd.histogram('uploaded.file.size', 1445) >>> statsd.histogram('file.count', 26, tags={"filetype": "python"}) """ self._report(metric, "h", value, tags, sample_rate) def timing(self, metric, value, tags=None, sample_rate=1): """ Record a timing, optionally setting tags and a sample rate. >>> statsd.timing("query.response.time", 1234) """ self._report(metric, "ms", value, tags, sample_rate) def timed(self, metric=None, error_metric=None, tags=None, sample_rate=1): """ A decorator or context manager that will measure the distribution of a function's/context's run time. Optionally specify a list of tags or a sample rate. If the metric is not defined as a decorator, the module name and function name will be used. The metric is required as a context manager. :: @statsd.timed('user.query.time', sample_rate=0.5) def get_user(user_id): # Do what you need to ... pass # Is equivalent to ... with statsd.timed('user.query.time', sample_rate=0.5): # Do what you need to ... pass # Is equivalent to ... start = time.monotonic() try: get_user(user_id) finally: statsd.timing('user.query.time', time.monotonic() - start) """ return TimedContextManagerDecorator( statsd=self, metric=metric, error_metric=error_metric, tags=tags, sample_rate=sample_rate, ) def set(self, metric, value, tags=None, sample_rate=1): """ Sample a set value. >>> statsd.set('visitors.uniques', 999) """ self._report(metric, "s", value, tags, sample_rate) @property def socket(self): """ Return a connected socket. Note: connect the socket before assigning it to the class instance to avoid bad thread race conditions. """ with self.lock: if not self._socket: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.connect((self.host, self.port)) self._socket = sock return self._socket def open_buffer(self, max_buffer_size=50): """ Open a buffer to send a batch of metrics in one packet. You can also use this as a context manager. >>> with Statsd() as batch: ... batch.gauge('users.online', 123) ... batch.gauge('active.connections', 1001) """ self.max_buffer_size = max_buffer_size self.buffer = [] self._send = self._send_to_buffer def close_buffer(self): """ Flush the buffer and switch back to single metric packets. """ self._send = self._send_to_server if self.buffer: # Only send packets if there are packets to send self._flush_buffer() def close_socket(self): """ Closes connected socket if connected. """ with self.lock: if self._socket: self._socket.close() self._socket = None def _report(self, metric, metric_type, value, tags, sample_rate): """ Create a metric packet and send it. """ if value is None: return if sample_rate != 1 and random() > sample_rate: return # Resolve the full tag list tags = self._add_constant_tags(tags) # Create/format the metric packet payload = "%s%s:%s|%s%s%s" % ( (self.namespace + ".") if self.namespace else "", metric, value, metric_type, ("|@" + str(sample_rate)) if sample_rate != 1 else "", ("|#" + ",".join("%s:%s" % (k, v) for (k, v) in sorted(tags.items()))) if tags else "", ) # Send it self._send(payload) def _send_to_server(self, packet): try: # If set, use socket directly self.socket.send(packet.encode("utf-8")) except socket.timeout: return except socket.error: log.debug( "Error submitting statsd packet." " Dropping the packet and closing the socket." ) self.close_socket() def _send_to_buffer(self, packet): self.buffer.append(packet) if len(self.buffer) >= self.max_buffer_size: self._flush_buffer() def _flush_buffer(self): self._send_to_server("\n".join(self.buffer)) self.buffer = [] def _add_constant_tags(self, tags): return { str(k): str(v) for k, v in itertools.chain( self.constant_tags.items(), (tags if tags else {}).items(), ) } @contextmanager def status_gauge( self, metric_name: str, statuses: Collection[str], tags: Optional[Dict[str, str]] = None, ): """Context manager to keep track of status changes as a gauge In addition to the `metric_name` and `tags` arguments, it expects a list of `statuses` to declare which statuses are possible, and returns a callable as context manager. This callable takes ones of the possible statuses as argument. Typical usage would be: >>> with statsd.status_gauge( "metric_name", ["starting", "processing", "waiting"]) as set_status: set_status("starting") # ... set_status("waiting") # ... """ if tags is None: tags = {} current_status: Optional[str] = None # reset status gauges to make sure they do not "leak" for status in statuses: self.gauge(metric_name, 0, {**tags, "status": status}) def set_status(new_status: str): nonlocal current_status assert isinstance(tags, dict) if new_status not in statuses: raise ValueError(f"{new_status} not in {statuses}") if current_status and new_status != current_status: self.gauge(metric_name, 0, {**tags, "status": current_status}) current_status = new_status self.gauge(metric_name, 1, {**tags, "status": current_status}) yield set_status # reset gauges on exit for status in statuses: self.gauge(metric_name, 0, {**tags, "status": status}) statsd = Statsd()