Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/bitbucket/lister.py
# Copyright (C) 2017-2019 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||
from datetime import datetime, timezone | from dataclasses import asdict, dataclass | ||||||||||
from datetime import datetime | |||||||||||
import logging | import logging | ||||||||||
from typing import Any, Dict, List, Optional | import time | ||||||||||
from typing import Any, Dict, Iterator, List, Optional | |||||||||||
from urllib import parse | from urllib import parse | ||||||||||
import iso8601 | import iso8601 | ||||||||||
from requests import Response | import requests | ||||||||||
from swh.lister.bitbucket.models import BitBucketModel | from swh.scheduler.interface import SchedulerInterface | ||||||||||
ardumont: Do we entirely drop the old bitbucket lister code?
If yes, you can also remove that module. | |||||||||||
from swh.lister.core.indexing_lister import IndexingHttpLister | from swh.scheduler.model import ListedOrigin | ||||||||||
from .. import USER_AGENT | |||||||||||
from ..pattern import CredentialsType, Lister | |||||||||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||||||
class BitBucketLister(IndexingHttpLister): | @dataclass | ||||||||||
PATH_TEMPLATE = "/repositories?after=%s" | class BitbucketListerState: | ||||||||||
MODEL = BitBucketModel | """State of my lister""" | ||||||||||
LISTER_NAME = "bitbucket" | |||||||||||
DEFAULT_URL = "https://api.bitbucket.org/2.0" | |||||||||||
instance = "bitbucket" | |||||||||||
default_min_bound = datetime.fromtimestamp(0, timezone.utc) # type: Any | |||||||||||
def __init__( | last_repo_cdate: Optional[datetime] = None | ||||||||||
self, url: str = None, override_config=None, per_page: int = 100 | """Date and time of the last repository listed on an incremental pass""" | ||||||||||
) -> None: | |||||||||||
super().__init__(url=url, override_config=override_config) | |||||||||||
per_page = self.config.get("per_page", per_page) | |||||||||||
self.PATH_TEMPLATE = "%s&pagelen=%s" % (self.PATH_TEMPLATE, per_page) | |||||||||||
def get_model_from_repo(self, repo: Dict) -> Dict[str, Any]: | |||||||||||
return { | |||||||||||
"uid": repo["uuid"], | |||||||||||
"indexable": iso8601.parse_date(repo["created_on"]), | |||||||||||
"name": repo["name"], | |||||||||||
"full_name": repo["full_name"], | |||||||||||
"html_url": repo["links"]["html"]["href"], | |||||||||||
"origin_url": repo["links"]["clone"][0]["href"], | |||||||||||
"origin_type": repo["scm"], | |||||||||||
} | |||||||||||
def get_next_target_from_response(self, response: Response) -> Optional[datetime]: | |||||||||||
"""This will read the 'next' link from the api response if any | |||||||||||
and return it as a datetime. | |||||||||||
Args: | |||||||||||
response (Response): requests' response from api call | |||||||||||
Returns: | class BitbucketLister(Lister[BitbucketListerState, List[Dict[str, Any]]]): | ||||||||||
next date as a datetime | """List origins from Bitbucket. | ||||||||||
""" | """ | ||||||||||
LISTER_NAME = "bitbucket" | |||||||||||
INSTANCE = "bitbucket" | |||||||||||
API_URL = "https://api.bitbucket.org/2.0" | |||||||||||
QUERY_TEMPLATE = "/repositories?pagelen={}&after={{}}" | |||||||||||
PAGE_SIZE = 100 | |||||||||||
INITIAL_BACKOFF = 10 # max anonymous 60 per hour, authenticated 1000 per hour | |||||||||||
MAX_RETRIES = 5 | |||||||||||
def __init__( | |||||||||||
self, | |||||||||||
scheduler: SchedulerInterface, | |||||||||||
per_page: int = 100, | |||||||||||
credentials: CredentialsType = None, | |||||||||||
): | |||||||||||
super().__init__( | |||||||||||
scheduler=scheduler, | |||||||||||
credentials=credentials, | |||||||||||
url=self.API_URL, | |||||||||||
Done Inline Actionsmaybe a warning if there is more than one set of credentials? vlorentz: maybe a warning if there is more than one set of credentials? | |||||||||||
instance=self.INSTANCE, | |||||||||||
) | |||||||||||
self.url_template = f"{self.API_URL}{self.QUERY_TEMPLATE}".format(per_page) | |||||||||||
self.session = requests.Session() | |||||||||||
self.session.headers.update( | |||||||||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | |||||||||||
) | |||||||||||
# Support only one credential of type basic auth | |||||||||||
if len(self.credentials) > 0: | |||||||||||
cred = self.credentials[0] | |||||||||||
if "username" in cred: | |||||||||||
Done Inline ActionsShouldn't these be (class)methods of BitbucketListerState? vlorentz: Shouldn't these be (class)methods of `BitbucketListerState`? | |||||||||||
Done Inline Actionsthe new lister API has them as instance methods of the lister, but indeed the API could be amended to have it as ListerState class methods. tenma: the new lister API has them as instance methods of the lister, but indeed the API could be… | |||||||||||
Done Inline Actionsyeah that makes sense, let's leave it like this then vlorentz: yeah that makes sense, let's leave it like this then | |||||||||||
self.session.auth = (cred["username"], cred["password"]) | |||||||||||
Done Inline Actions
vlorentz: | |||||||||||
self.backoff = self.INITIAL_BACKOFF | |||||||||||
self.request_count = 0 | |||||||||||
Done Inline ActionsCan you rename the function to set_credentials, shorter is better. Also parameters should be typed to Optional[str]. anlambert: Can you rename the function to `set_credentials`, shorter is better. Also parameters should be… | |||||||||||
def state_from_dict(self, d: Dict[str, Any]) -> BitbucketListerState: | |||||||||||
last_repo_cdate = d.get("last_repo_cdate") | |||||||||||
Done Inline Actions
vlorentz: | |||||||||||
if last_repo_cdate is not None: | |||||||||||
d["last_repo_cdate"] = iso8601.parse_date(last_repo_cdate) | |||||||||||
return BitbucketListerState(**d) | |||||||||||
Done Inline Actionsyou can remove that parameter as it is already the default value of the decorator anlambert: you can remove that parameter as it is already the default value of the decorator | |||||||||||
def state_to_dict(self, state: BitbucketListerState) -> Dict[str, Any]: | |||||||||||
Done Inline Actions
And why does it need to be Optional[bool] instead of just bool? vlorentz: And why does it need to be `Optional[bool]` instead of just `bool`? | |||||||||||
d = asdict(state) | |||||||||||
last_repo_cdate = d.get("last_repo_cdate") | |||||||||||
if last_repo_cdate is not None: | |||||||||||
d["last_repo_cdate"] = last_repo_cdate.isoformat() | |||||||||||
return d | |||||||||||
def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | |||||||||||
last_repo_cdate: Optional[str] = "1970-01-01" | |||||||||||
if self.state is not None and self.state.last_repo_cdate is not None: | |||||||||||
last_repo_cdate = self.state.last_repo_cdate.isoformat() | |||||||||||
while last_repo_cdate is not None: | |||||||||||
url = self.url_template.format(parse.quote(last_repo_cdate)) | |||||||||||
logger.debug("Page URL: %s", url) | |||||||||||
response = self.session.get(url) | |||||||||||
# handle HTTP errors | |||||||||||
if response.status_code == 429: | |||||||||||
if self.request_count >= self.MAX_RETRIES: | |||||||||||
logger.warning( | |||||||||||
"Max number of attempts hit (%s), giving up", | |||||||||||
self.request_count, | |||||||||||
) | |||||||||||
break | |||||||||||
logger.warning("Rate limit was hit, sleeping %ss", self.backoff) | |||||||||||
time.sleep(self.backoff) | |||||||||||
self.backoff *= 10 | |||||||||||
self.request_count += 1 | |||||||||||
continue | |||||||||||
if response.status_code != 200: | |||||||||||
logger.warning( | |||||||||||
"Got unexpected status_code %s: %s", | |||||||||||
response.status_code, | |||||||||||
response.content, | |||||||||||
) | |||||||||||
break | |||||||||||
self.request_count = 0 | |||||||||||
body = response.json() | body = response.json() | ||||||||||
next_ = body.get("next") | next_page_url = body.get("next") | ||||||||||
if next_ is not None: | if next_page_url is not None: | ||||||||||
next_ = parse.urlparse(next_) | next_page_url = parse.urlparse(next_page_url) | ||||||||||
return iso8601.parse_date(parse.parse_qs(next_.query)["after"][0]) | if not next_page_url.query: | ||||||||||
return None | logger.warning("Failed to parse url %s", next_page_url) | ||||||||||
break | |||||||||||
def transport_response_simplified(self, response: Response) -> List[Dict[str, Any]]: | last_repo_cdate = parse.parse_qs(next_page_url.query)["after"][0] | ||||||||||
repos = response.json()["values"] | |||||||||||
return [self.get_model_from_repo(repo) for repo in repos] | |||||||||||
def request_uri(self, identifier: datetime) -> str: # type: ignore | |||||||||||
identifier_str = parse.quote(identifier.isoformat()) | |||||||||||
return super().request_uri(identifier_str or "1970-01-01") | |||||||||||
def is_within_bounds( | |||||||||||
self, inner: int, lower: Optional[int] = None, upper: Optional[int] = None | |||||||||||
) -> bool: | |||||||||||
# values are expected to be datetimes | |||||||||||
if lower is None and upper is None: | |||||||||||
ret = True | |||||||||||
elif lower is None: | |||||||||||
ret = inner <= upper # type: ignore | |||||||||||
elif upper is None: | |||||||||||
ret = inner >= lower | |||||||||||
else: | else: | ||||||||||
ret = lower <= inner <= upper | last_repo_cdate = None | ||||||||||
return ret | |||||||||||
yield body["values"] | |||||||||||
def get_origins_from_page( | |||||||||||
self, page: List[Dict[str, Any]] | |||||||||||
) -> Iterator[ListedOrigin]: | |||||||||||
"""Convert a page of Bitbucket repositories into a list of ListedOrigins. | |||||||||||
Done Inline ActionsIMO this yield should be moved before if next_page_url is not None for two reasons:
vlorentz: IMO this `yield` should be moved before `if next_page_url is not None` for two reasons:
1. we… | |||||||||||
""" | |||||||||||
assert self.lister_obj.id is not None | |||||||||||
for repo in page: | |||||||||||
last_update = iso8601.parse_date(repo["updated_on"]) | |||||||||||
origin_url = repo["links"]["clone"][0]["href"] | |||||||||||
origin_type = repo["scm"] | |||||||||||
yield ListedOrigin( | |||||||||||
lister_id=self.lister_obj.id, | |||||||||||
url=origin_url, | |||||||||||
visit_type=origin_type, | |||||||||||
last_update=last_update, | |||||||||||
) | |||||||||||
Not Done Inline Actions
vlorentz: | |||||||||||
def commit_page(self, page: List[Dict[str, Any]]): | |||||||||||
"""Update the currently stored state using the latest listed page""" | |||||||||||
last_repo = page[-1] | |||||||||||
last_repo_cdate = iso8601.parse_date(last_repo["created_on"]) | |||||||||||
if ( | |||||||||||
self.state.last_repo_cdate is None | |||||||||||
or last_repo_cdate > self.state.last_repo_cdate | |||||||||||
): | |||||||||||
self.state.last_repo_cdate = last_repo_cdate | |||||||||||
def finalize(self): | |||||||||||
scheduler_state = self.get_state_from_scheduler() | |||||||||||
if self.state.last_repo_cdate is None: | |||||||||||
return | |||||||||||
# Update the lister state in the backend only if the last seen id of | |||||||||||
# the current run is higher than that stored in the database. | |||||||||||
if ( | |||||||||||
scheduler_state.last_repo_cdate is None | |||||||||||
or self.state.last_repo_cdate > scheduler_state.last_repo_cdate | |||||||||||
): | |||||||||||
self.updated = True |
Do we entirely drop the old bitbucket lister code?
If yes, you can also remove that module.