Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/bitbucket/lister.py
# Copyright (C) 2017-2021 The Software Heritage developers | # Copyright (C) 2017-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import asdict, dataclass | from dataclasses import asdict, dataclass | ||||
from datetime import datetime | from datetime import datetime | ||||
import logging | import logging | ||||
import random | import random | ||||
from typing import Any, Dict, Iterator, List, Optional | from typing import Any, Dict, Iterator, List, Optional | ||||
from urllib import parse | from urllib import parse | ||||
import iso8601 | import iso8601 | ||||
import requests | |||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister.utils import http_retry | |||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from .. import USER_AGENT | |||||
from ..pattern import CredentialsType, Lister | from ..pattern import CredentialsType, Lister | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
@dataclass | @dataclass | ||||
class BitbucketListerState: | class BitbucketListerState: | ||||
"""State of Bitbucket lister""" | """State of Bitbucket lister""" | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | ): | ||||
# only return needed JSON fields in bitbucket API responses | # only return needed JSON fields in bitbucket API responses | ||||
# (also prevent errors 500 when listing) | # (also prevent errors 500 when listing) | ||||
"fields": ( | "fields": ( | ||||
"next,values.links.clone.href,values.scm,values.updated_on," | "next,values.links.clone.href,values.scm,values.updated_on," | ||||
"values.created_on" | "values.created_on" | ||||
), | ), | ||||
} | } | ||||
self.session = requests.Session() | self.session.headers.update({"Accept": "application/json"}) | ||||
self.session.headers.update( | |||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | |||||
) | |||||
if len(self.credentials) > 0: | if len(self.credentials) > 0: | ||||
cred = random.choice(self.credentials) | cred = random.choice(self.credentials) | ||||
logger.warning("Using Bitbucket credentials from user %s", cred["username"]) | logger.warning("Using Bitbucket credentials from user %s", cred["username"]) | ||||
self.set_credentials(cred["username"], cred["password"]) | self.set_credentials(cred["username"], cred["password"]) | ||||
else: | else: | ||||
logger.warning("No credentials set in configuration, using anonymous mode") | logger.warning("No credentials set in configuration, using anonymous mode") | ||||
Show All 10 Lines | def state_to_dict(self, state: BitbucketListerState) -> Dict[str, Any]: | ||||
d["last_repo_cdate"] = last_repo_cdate.isoformat() | d["last_repo_cdate"] = last_repo_cdate.isoformat() | ||||
return d | return d | ||||
def set_credentials(self, username: Optional[str], password: Optional[str]) -> None: | def set_credentials(self, username: Optional[str], password: Optional[str]) -> None: | ||||
"""Set basic authentication headers with given credentials.""" | """Set basic authentication headers with given credentials.""" | ||||
if username is not None and password is not None: | if username is not None and password is not None: | ||||
self.session.auth = (username, password) | self.session.auth = (username, password) | ||||
@http_retry(before_sleep=before_sleep_log(logger, logging.DEBUG)) | |||||
def page_request(self, last_repo_cdate: str) -> requests.Response: | |||||
self.url_params["after"] = last_repo_cdate | |||||
logger.debug("Fetching URL %s with params %s", self.url, self.url_params) | |||||
response = self.session.get(self.url, params=self.url_params) | |||||
if response.status_code != 200: | |||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
response.raise_for_status() | |||||
return response | |||||
def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | ||||
last_repo_cdate: str = "1970-01-01" | last_repo_cdate: str = "1970-01-01" | ||||
if ( | if ( | ||||
self.incremental | self.incremental | ||||
and self.state is not None | and self.state is not None | ||||
and self.state.last_repo_cdate is not None | and self.state.last_repo_cdate is not None | ||||
): | ): | ||||
last_repo_cdate = self.state.last_repo_cdate.isoformat() | last_repo_cdate = self.state.last_repo_cdate.isoformat() | ||||
while True: | while True: | ||||
body = self.page_request(last_repo_cdate).json() | self.url_params["after"] = last_repo_cdate | ||||
body = self.http_request(self.url, params=self.url_params).json() | |||||
yield body["values"] | yield body["values"] | ||||
next_page_url = body.get("next") | next_page_url = body.get("next") | ||||
if next_page_url is not None: | if next_page_url is not None: | ||||
next_page_url = parse.urlparse(next_page_url) | next_page_url = parse.urlparse(next_page_url) | ||||
if not next_page_url.query: | if not next_page_url.query: | ||||
logger.warning("Failed to parse url %s", next_page_url) | logger.warning("Failed to parse url %s", next_page_url) | ||||
▲ Show 20 Lines • Show All 50 Lines • Show Last 20 Lines |