Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/gitea/lister.py
# Copyright (C) 2018-2021 The Software Heritage developers | # Copyright (C) 2018-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
import random | |||||
from typing import Any, Dict, Iterator, List, Optional | |||||
from urllib.parse import urljoin | |||||
import iso8601 | |||||
import requests | |||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister.utils import throttling_retry | |||||
from swh.scheduler.interface import SchedulerInterface | |||||
from swh.scheduler.model import ListedOrigin | |||||
from .. import USER_AGENT | from ..gogs.lister import GogsLister | ||||
from ..pattern import CredentialsType, StatelessLister | |||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
RepoListPage = List[Dict[str, Any]] | |||||
class GiteaLister(GogsLister): | |||||
class GiteaLister(StatelessLister[RepoListPage]): | |||||
"""List origins from Gitea. | """List origins from Gitea. | ||||
Gitea API documentation: https://try.gitea.io/api/swagger | Gitea API documentation: https://try.gitea.io/api/swagger | ||||
The API does pagination and provides navigation URLs through the 'Link' header. | The API does pagination and provides navigation URLs through the 'Link' header. | ||||
The default value for page size is the maximum value observed on the instances | The default value for page size is the maximum value observed on the instances | ||||
accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/.""" | accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/.""" | ||||
LISTER_NAME = "gitea" | LISTER_NAME = "gitea" | ||||
REPO_LIST_PATH = "repos/search" | def on_anonymous_mode(self): | ||||
def __init__( | |||||
self, | |||||
scheduler: SchedulerInterface, | |||||
url: str, | |||||
instance: Optional[str] = None, | |||||
api_token: Optional[str] = None, | |||||
page_size: int = 50, | |||||
credentials: CredentialsType = None, | |||||
): | |||||
super().__init__( | |||||
scheduler=scheduler, | |||||
credentials=credentials, | |||||
url=url, | |||||
instance=instance, | |||||
) | |||||
self.query_params = { | |||||
"sort": "id", | |||||
"order": "asc", | |||||
"limit": page_size, | |||||
"page": 1, | |||||
} | |||||
self.session = requests.Session() | |||||
self.session.headers.update( | |||||
{ | |||||
"Accept": "application/json", | |||||
"User-Agent": USER_AGENT, | |||||
} | |||||
) | |||||
if api_token is None: | |||||
if len(self.credentials) > 0: | |||||
cred = random.choice(self.credentials) | |||||
username = cred.get("username") | |||||
api_token = cred["password"] | |||||
logger.warning( | |||||
"Using authentication token from user %s", username or "???" | |||||
) | |||||
else: | |||||
logger.warning( | logger.warning( | ||||
"No authentication token set in configuration, using anonymous mode" | "No authentication token set in configuration, using anonymous mode" | ||||
) | ) | ||||
if api_token: | |||||
self.session.headers["Authorization"] = "Token %s" % api_token | |||||
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | |||||
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: | |||||
logger.info("Fetching URL %s with params %s", url, params) | |||||
response = self.session.get(url, params=params) | |||||
if response.status_code != 200: | |||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
response.raise_for_status() | |||||
return response | |||||
@classmethod | |||||
def results_simplified(cls, body: Dict[str, RepoListPage]) -> RepoListPage: | |||||
fields_filter = ["id", "clone_url", "updated_at"] | |||||
return [{k: r[k] for k in fields_filter} for r in body["data"]] | |||||
def get_pages(self) -> Iterator[RepoListPage]: | |||||
# base with trailing slash, path without leading slash for urljoin | |||||
url: str = urljoin(self.url, self.REPO_LIST_PATH) | |||||
response = self.page_request(url, self.query_params) | |||||
while True: | |||||
page_results = self.results_simplified(response.json()) | |||||
yield page_results | |||||
assert len(response.links) > 0, "API changed: no Link header found" | |||||
if "next" in response.links: | |||||
url = response.links["next"]["url"] | |||||
else: | |||||
# last page | |||||
break | |||||
response = self.page_request(url, {}) | |||||
def get_origins_from_page(self, page: RepoListPage) -> Iterator[ListedOrigin]: | |||||
"""Convert a page of Gitea repositories into a list of ListedOrigins.""" | |||||
assert self.lister_obj.id is not None | |||||
for repo in page: | |||||
last_update = iso8601.parse_date(repo["updated_at"]) | |||||
yield ListedOrigin( | |||||
lister_id=self.lister_obj.id, | |||||
url=repo["clone_url"], | |||||
visit_type="git", | |||||
last_update=last_update, | |||||
) |