Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/github/lister.py
Show First 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | @retry( | ||||
retry=retry_any( | retry=retry_any( | ||||
# ChunkedEncodingErrors happen when the TLS connection gets reset, e.g. | # ChunkedEncodingErrors happen when the TLS connection gets reset, e.g. | ||||
# when running the lister on a connection with high latency | # when running the lister on a connection with high latency | ||||
retry_if_exception_type(requests.exceptions.ChunkedEncodingError), | retry_if_exception_type(requests.exceptions.ChunkedEncodingError), | ||||
# 502 status codes happen for a Server Error, sometimes | # 502 status codes happen for a Server Error, sometimes | ||||
retry_if_result(lambda r: r.status_code == 502), | retry_if_result(lambda r: r.status_code == 502), | ||||
), | ), | ||||
) | ) | ||||
def github_request( | def github_request(url: str, session: requests.Session) -> requests.Response: | ||||
url: str, token: Optional[str] = None, session: Optional[requests.Session] = None | response = session.get(url) | ||||
) -> requests.Response: | |||||
session = init_session(session) | |||||
headers = {} | |||||
if token: | |||||
headers["Authorization"] = f"token {token}" | |||||
response = session.get(url, headers=headers) | anonymous = "Authorization" not in session.headers | ||||
anonymous = token is None and "Authorization" not in session.headers | |||||
if ( | if ( | ||||
# GitHub returns inconsistent status codes between unauthenticated | # GitHub returns inconsistent status codes between unauthenticated | ||||
# rate limit and authenticated rate limits. Handle both. | # rate limit and authenticated rate limits. Handle both. | ||||
response.status_code == 429 | response.status_code == 429 | ||||
or (anonymous and response.status_code == 403) | or (anonymous and response.status_code == 403) | ||||
): | ): | ||||
raise RateLimited(response) | raise RateLimited(response) | ||||
▲ Show 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | def set_next_session_token(self) -> None: | ||||
if "password" in auth: | if "password" in auth: | ||||
token = auth["password"] | token = auth["password"] | ||||
else: | else: | ||||
token = auth["token"] | token = auth["token"] | ||||
self.current_user = auth["username"] | self.current_user = auth["username"] | ||||
logger.debug("Using authentication token for user %s", self.current_user) | logger.debug("Using authentication token for user %s", self.current_user) | ||||
self.session.headers.update({"Authorization": f"token {token}"}) | self.session.headers.update({"Authorization": f"token {token}"}) | ||||
ardumont: Ok, it's handled in self.session (see previous lines).
Now i got it. | |||||
def state_from_dict(self, d: Dict[str, Any]) -> GitHubListerState: | def state_from_dict(self, d: Dict[str, Any]) -> GitHubListerState: | ||||
return GitHubListerState(**d) | return GitHubListerState(**d) | ||||
def state_to_dict(self, state: GitHubListerState) -> Dict[str, Any]: | def state_to_dict(self, state: GitHubListerState) -> Dict[str, Any]: | ||||
return asdict(state) | return asdict(state) | ||||
def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | ||||
▲ Show 20 Lines • Show All 152 Lines • Show Last 20 Lines |
Ok, it's handled in self.session (see previous lines).
Now i got it.