Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/github/lister.py
# Copyright (C) 2017-2019 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import re | import re | ||||
import time | import time | ||||
from typing import Any | from typing import Any, Optional | ||||
from swh.lister.core.indexing_lister import IndexingHttpLister | from swh.lister.core.indexing_lister import IndexingHttpLister | ||||
from swh.lister.github.models import GitHubModel | from swh.lister.github.models import GitHubModel | ||||
class GitHubLister(IndexingHttpLister): | class GitHubLister(IndexingHttpLister): | ||||
PATH_TEMPLATE = '/repositories?since=%d' | PATH_TEMPLATE = '/repositories?since=%d' | ||||
MODEL = GitHubModel | MODEL = GitHubModel | ||||
Show All 11 Lines | def get_model_from_repo(self, repo): | ||||
'full_name': repo['full_name'], | 'full_name': repo['full_name'], | ||||
'html_url': repo['html_url'], | 'html_url': repo['html_url'], | ||||
'origin_url': repo['html_url'], | 'origin_url': repo['html_url'], | ||||
'origin_type': 'git', | 'origin_type': 'git', | ||||
'fork': repo['fork'], | 'fork': repo['fork'], | ||||
} | } | ||||
def transport_quota_check(self, response): | def transport_quota_check(self, response): | ||||
"""Check for rate limit usage | |||||
""" | |||||
def delay(response) -> float: | |||||
"""Compute next delay query tryout given the current response""" | |||||
reset_at = int(response.headers['X-RateLimit-Reset']) | |||||
delay = min(reset_at - time.time(), 3600) | |||||
return delay | |||||
douardda: No need for this function to be inlined here (does not capture the closure or similar). So no… | |||||
Done Inline Actionsdo you mean this can be set as class method directly? (my main concern was to avoid repetition of this code in the conditional below). ardumont: do you mean this can be set as class method directly?
(my main concern was to avoid repetition… | |||||
def credential_used(response) -> Optional[str]: | |||||
"""Compute the current credential used given the current response | |||||
""" | |||||
authorization = response.request.headers.get('Authorization') | |||||
if not authorization: | |||||
return None | |||||
authorization = authorization.split('Basic ') | |||||
from base64 import b64decode | |||||
cred_tuple = b64decode(authorization).decode('utf-8').split(':') | |||||
Done Inline ActionsAnother implem' could be to override the default request_params method and set the current auth tuple (self.auth = auth or something) used... I'm not sure what's cleaner, clearer, etc... ardumont: Another implem' could be to override the default `request_params` method and set the current… | |||||
return cred_tuple[0] # the credential login | |||||
x_rate_limit_remaining = response.headers.get('X-RateLimit-Remaining') | x_rate_limit_remaining = response.headers.get('X-RateLimit-Remaining') | ||||
if not x_rate_limit_remaining: | if not x_rate_limit_remaining: | ||||
return False, 0 | return False, 0 | ||||
reqs_remaining = int(x_rate_limit_remaining) | reqs_remaining = int(x_rate_limit_remaining) | ||||
if response.status_code == 403 and reqs_remaining == 0: | if response.status_code == 403 and reqs_remaining == 0: | ||||
reset_at = int(response.headers['X-RateLimit-Reset']) | return True, delay(response) | ||||
delay = min(reset_at - time.time(), 3600) | if response.status_code == 401: | ||||
return True, delay | data = response.json() | ||||
if data['message'] == 'Bad Credentials': | |||||
# The authentication token used is expired. Remove it from the | |||||
# configuration and try again immediately with another | |||||
if hasattr(self, 'creds'): | |||||
login = credential_used(response) | |||||
if login in self.creds: | |||||
self.creds.pop(login) | |||||
return False, 0 | |||||
return True, delay(response) | |||||
return False, 0 | return False, 0 | ||||
def get_next_target_from_response(self, response): | def get_next_target_from_response(self, response): | ||||
if 'next' in response.links: | if 'next' in response.links: | ||||
next_url = response.links['next']['url'] | next_url = response.links['next']['url'] | ||||
return int(self.API_URL_INDEX_RE.match(next_url).group(1)) | return int(self.API_URL_INDEX_RE.match(next_url).group(1)) | ||||
def transport_response_simplified(self, response): | def transport_response_simplified(self, response): | ||||
Show All 21 Lines |
No need for this function to be inlined here (does not capture the closure or similar). So no need to instantiate a new function object on each call of this method...