Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/gogs/lister.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import asdict, dataclass | from dataclasses import asdict, dataclass | ||||
import logging | import logging | ||||
import random | import random | ||||
from typing import Any, Dict, Iterator, List, Optional, Tuple | from typing import Any, Dict, Iterator, List, Optional, Tuple | ||||
from urllib.parse import parse_qs, parse_qsl, urlencode, urljoin, urlparse | from urllib.parse import parse_qs, parse_qsl, urlencode, urljoin, urlparse | ||||
import iso8601 | import iso8601 | ||||
import requests | from requests.exceptions import HTTPError | ||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister.utils import http_retry | |||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from .. import USER_AGENT | |||||
from ..pattern import CredentialsType, Lister | from ..pattern import CredentialsType, Lister | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
Repo = Dict[str, Any] | Repo = Dict[str, Any] | ||||
@dataclass | @dataclass | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | ): | ||||
cred = random.choice(self.credentials) | cred = random.choice(self.credentials) | ||||
username = cred.get("username") | username = cred.get("username") | ||||
self.api_token = cred["password"] | self.api_token = cred["password"] | ||||
logger.info("Using authentication credentials from user %s", username) | logger.info("Using authentication credentials from user %s", username) | ||||
else: | else: | ||||
# Raises an error on Gogs, or a warning on Gitea | # Raises an error on Gogs, or a warning on Gitea | ||||
self.on_anonymous_mode() | self.on_anonymous_mode() | ||||
self.session = requests.Session() | self.session.headers.update({"Accept": "application/json"}) | ||||
self.session.headers.update( | |||||
{ | |||||
"Accept": "application/json", | |||||
"User-Agent": USER_AGENT, | |||||
} | |||||
) | |||||
if self.api_token: | if self.api_token: | ||||
self.session.headers["Authorization"] = f"token {self.api_token}" | self.session.headers["Authorization"] = f"token {self.api_token}" | ||||
def on_anonymous_mode(self): | def on_anonymous_mode(self): | ||||
raise ValueError("No credentials or API token provided") | raise ValueError("No credentials or API token provided") | ||||
def state_from_dict(self, d: Dict[str, Any]) -> GogsListerState: | def state_from_dict(self, d: Dict[str, Any]) -> GogsListerState: | ||||
return GogsListerState(**d) | return GogsListerState(**d) | ||||
def state_to_dict(self, state: GogsListerState) -> Dict[str, Any]: | def state_to_dict(self, state: GogsListerState) -> Dict[str, Any]: | ||||
return asdict(state) | return asdict(state) | ||||
@http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | |||||
def page_request( | def page_request( | ||||
self, url: str, params: Dict[str, Any] | self, url: str, params: Dict[str, Any] | ||||
) -> Tuple[Dict[str, Any], Dict[str, Any]]: | ) -> Tuple[Dict[str, Any], Dict[str, Any]]: | ||||
logger.debug("Fetching URL %s with params %s", url, params) | logger.debug("Fetching URL %s with params %s", url, params) | ||||
response = self.session.get(url, params=params) | try: | ||||
response = self.http_request(url, params=params) | |||||
if response.status_code != 200: | except HTTPError as http_error: | ||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
if ( | if ( | ||||
response.status_code == 500 | http_error.response.status_code == 500 | ||||
): # Temporary hack for skipping fatal repos (T4423) | ): # Temporary hack for skipping fatal repos (T4423) | ||||
url_parts = urlparse(url) | url_parts = urlparse(url) | ||||
query: Dict[str, Any] = dict(parse_qsl(url_parts.query)) | query: Dict[str, Any] = dict(parse_qsl(url_parts.query)) | ||||
query.update({"page": _parse_page_id(url) + 1}) | query.update({"page": _parse_page_id(url) + 1}) | ||||
next_page_link = url_parts._replace(query=urlencode(query)).geturl() | next_page_link = url_parts._replace(query=urlencode(query)).geturl() | ||||
body: Dict[str, Any] = {"data": []} | body: Dict[str, Any] = {"data": []} | ||||
links = {"next": {"url": next_page_link}} | links = {"next": {"url": next_page_link}} | ||||
return body, links | return body, links | ||||
else: | else: | ||||
response.raise_for_status() | raise | ||||
return response.json(), response.links | return response.json(), response.links | ||||
@classmethod | @classmethod | ||||
def extract_repos(cls, body: Dict[str, Any]) -> List[Repo]: | def extract_repos(cls, body: Dict[str, Any]) -> List[Repo]: | ||||
fields_filter = ["id", "clone_url", "updated_at"] | fields_filter = ["id", "clone_url", "updated_at"] | ||||
return [{k: r[k] for k in fields_filter} for r in body["data"]] | return [{k: r[k] for k in fields_filter} for r in body["data"]] | ||||
def get_pages(self) -> Iterator[GogsListerPage]: | def get_pages(self) -> Iterator[GogsListerPage]: | ||||
page_id = 1 | page_id = 1 | ||||
if self.state.last_seen_next_link is not None: | if self.state.last_seen_next_link is not None: | ||||
page_id = _parse_page_id(self.state.last_seen_next_link) | page_id = _parse_page_id(self.state.last_seen_next_link) | ||||
# base with trailing slash, path without leading slash for urljoin | # base with trailing slash, path without leading slash for urljoin | ||||
next_link: Optional[str] = urljoin(self.url, self.REPO_LIST_PATH) | next_link: Optional[str] = urljoin(self.url, self.REPO_LIST_PATH) | ||||
assert next_link is not None | |||||
body, links = self.page_request( | body, links = self.page_request( | ||||
next_link, {**self.query_params, "page": page_id} | next_link, {**self.query_params, "page": page_id} | ||||
) | ) | ||||
while next_link is not None: | while next_link is not None: | ||||
repos = self.extract_repos(body) | repos = self.extract_repos(body) | ||||
▲ Show 20 Lines • Show All 51 Lines • Show Last 20 Lines |