diff --git a/swh/lister/github/lister.py b/swh/lister/github/lister.py --- a/swh/lister/github/lister.py +++ b/swh/lister/github/lister.py @@ -30,19 +30,6 @@ logger = logging.getLogger(__name__) -def init_session(session: Optional[requests.Session] = None) -> requests.Session: - """Initialize a requests session with the proper headers for requests to - GitHub.""" - if not session: - session = requests.Session() - - session.headers.update( - {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT} - ) - - return session - - class RateLimited(Exception): def __init__(self, response): self.reset_time: Optional[int] @@ -65,30 +52,130 @@ self.response = response -@retry( - wait=wait_exponential(multiplier=1, min=4, max=10), - retry=retry_any( - # ChunkedEncodingErrors happen when the TLS connection gets reset, e.g. - # when running the lister on a connection with high latency - retry_if_exception_type(requests.exceptions.ChunkedEncodingError), - # 502 status codes happen for a Server Error, sometimes - retry_if_result(lambda r: r.status_code == 502), - ), -) -def github_request(url: str, session: requests.Session) -> requests.Response: - response = session.get(url) +class MissingRateLimitReset(Exception): + pass - anonymous = "Authorization" not in session.headers - if ( - # GitHub returns inconsistent status codes between unauthenticated - # rate limit and authenticated rate limits. Handle both. - response.status_code == 429 - or (anonymous and response.status_code == 403) - ): - raise RateLimited(response) +class GitHubSession: + """Manages a :class:`requests.Session` with (optionally) multiple credentials, + and cycles through them when reaching rate-limits.""" + + def __init__(self, credentials: Optional[List[Dict[str, str]]] = None) -> None: + """Initialize a requests session with the proper headers for requests to + GitHub.""" + self.credentials = credentials + if self.credentials: + random.shuffle(self.credentials) + + self.session = requests.Session() + + self.session.headers.update( + {"Accept": "application/vnd.github.v3+json", "User-Agent": USER_AGENT} + ) + + self.anonymous = not self.credentials + + if self.anonymous: + logger.warning("No tokens set in configuration, using anonymous mode") + + self.token_index = -1 + self.current_user: Optional[str] = None + + if not self.anonymous: + # Initialize the first token value in the session headers + self.set_next_session_token() + + def set_next_session_token(self) -> None: + """Update the current authentication token with the next one in line.""" + + assert self.credentials + + self.token_index = (self.token_index + 1) % len(self.credentials) + + auth = self.credentials[self.token_index] - return response + self.current_user = auth["username"] + logger.debug("Using authentication token for user %s", self.current_user) + + if "password" in auth: + token = auth["password"] + else: + token = auth["token"] + + self.session.headers.update({"Authorization": f"token {token}"}) + + @retry( + wait=wait_exponential(multiplier=1, min=4, max=10), + retry=retry_any( + # ChunkedEncodingErrors happen when the TLS connection gets reset, e.g. + # when running the lister on a connection with high latency + retry_if_exception_type(requests.exceptions.ChunkedEncodingError), + # 502 status codes happen for a Server Error, sometimes + retry_if_result(lambda r: r.status_code == 502), + ), + ) + def _request(self, url: str) -> requests.Response: + response = self.session.get(url) + + if ( + # GitHub returns inconsistent status codes between unauthenticated + # rate limit and authenticated rate limits. Handle both. + response.status_code == 429 + or (self.anonymous and response.status_code == 403) + ): + raise RateLimited(response) + + return response + + def request(self, url) -> requests.Response: + """Repeatedly requests the given URL, cycling through credentials and sleeping + if necessary; until either a successful response or :exc:`MissingRateLimitReset` + """ + # The following for/else loop handles rate limiting; if successful, + # it provides the rest of the function with a `response` object. + # + # If all tokens are rate-limited, we sleep until the reset time, + # then `continue` into another iteration of the outer while loop, + # attempting to get data from the same URL again. + + while True: + max_attempts = len(self.credentials) if self.credentials else 1 + reset_times: Dict[int, int] = {} # token index -> time + for attempt in range(max_attempts): + try: + return self._request(url) + except RateLimited as e: + reset_info = "(unknown reset)" + if e.reset_time is not None: + reset_times[self.token_index] = e.reset_time + reset_info = "(resetting in %ss)" % (e.reset_time - time.time()) + + if not self.anonymous: + logger.info( + "Rate limit exhausted for current user %s %s", + self.current_user, + reset_info, + ) + # Use next token in line + self.set_next_session_token() + # Wait one second to avoid triggering GitHub's abuse rate limits + time.sleep(1) + + # All tokens have been rate-limited. What do we do? + + if not reset_times: + logger.warning( + "No X-Ratelimit-Reset value found in responses for any token; " + "Giving up." + ) + raise MissingRateLimitReset() + + sleep_time = max(reset_times.values()) - time.time() + 1 + logger.info( + "Rate limits exhausted for all tokens. Sleeping for %f seconds.", + sleep_time, + ) + time.sleep(sleep_time) @dataclass @@ -156,37 +243,7 @@ self.relisting = self.first_id is not None or self.last_id is not None - self.session = init_session() - - random.shuffle(self.credentials) - - self.anonymous = not self.credentials - - if self.anonymous: - logger.warning("No tokens set in configuration, using anonymous mode") - - self.token_index = -1 - self.current_user: Optional[str] = None - - if not self.anonymous: - # Initialize the first token value in the session headers - self.set_next_session_token() - - def set_next_session_token(self) -> None: - """Update the current authentication token with the next one in line.""" - - self.token_index = (self.token_index + 1) % len(self.credentials) - - auth = self.credentials[self.token_index] - if "password" in auth: - token = auth["password"] - else: - token = auth["token"] - - self.current_user = auth["username"] - logger.debug("Using authentication token for user %s", self.current_user) - - self.session.headers.update({"Authorization": f"token {token}"}) + self.github_session = GitHubSession(credentials=self.credentials) def state_from_dict(self, d: Dict[str, Any]) -> GitHubListerState: return GitHubListerState(**d) @@ -206,54 +263,11 @@ while self.last_id is None or current_id < self.last_id: logger.debug("Getting page %s", current_url) - # The following for/else loop handles rate limiting; if successful, - # it provides the rest of the function with a `response` object. - # - # If all tokens are rate-limited, we sleep until the reset time, - # then `continue` into another iteration of the outer while loop, - # attempting to get data from the same URL again. - - max_attempts = 1 if self.anonymous else len(self.credentials) - reset_times: Dict[int, int] = {} # token index -> time - for attempt in range(max_attempts): - try: - response = github_request(current_url, session=self.session) - break - except RateLimited as e: - reset_info = "(unknown reset)" - if e.reset_time is not None: - reset_times[self.token_index] = e.reset_time - reset_info = "(resetting in %ss)" % (e.reset_time - time.time()) - - if not self.anonymous: - logger.info( - "Rate limit exhausted for current user %s %s", - self.current_user, - reset_info, - ) - # Use next token in line - self.set_next_session_token() - # Wait one second to avoid triggering GitHub's abuse rate limits - time.sleep(1) - else: - # All tokens have been rate-limited. What do we do? - - if not reset_times: - logger.warning( - "No X-Ratelimit-Reset value found in responses for any token; " - "Giving up." - ) - break - - sleep_time = max(reset_times.values()) - time.time() + 1 - logger.info( - "Rate limits exhausted for all tokens. Sleeping for %f seconds.", - sleep_time, - ) - time.sleep(sleep_time) - # This goes back to the outer page-by-page loop, doing one more - # iteration on the same page - continue + try: + response = self.github_session.request(current_url) + except MissingRateLimitReset: + # Give up + break # We've successfully retrieved a (non-ratelimited) `response`. We # still need to check it for validity. diff --git a/swh/lister/github/tests/test_lister.py b/swh/lister/github/tests/test_lister.py --- a/swh/lister/github/tests/test_lister.py +++ b/swh/lister/github/tests/test_lister.py @@ -270,7 +270,7 @@ caplog.set_level(logging.DEBUG, "swh.lister.github.lister") lister = GitHubLister(scheduler=swh_scheduler) - assert lister.anonymous + assert lister.github_session.anonymous assert "using anonymous mode" in caplog.records[-1].message caplog.clear() @@ -315,9 +315,9 @@ caplog.set_level(logging.DEBUG, "swh.lister.github.lister") lister = GitHubLister(scheduler=swh_scheduler, credentials=lister_credentials) - assert lister.token_index == 0 + assert lister.github_session.token_index == 0 assert sorted(lister.credentials, key=lambda t: t["username"]) == github_credentials - assert lister.session.headers["Authorization"] in [ + assert lister.github_session.session.headers["Authorization"] in [ "token %s" % t for t in all_tokens ]