diff --git a/swh/lister/gitea/lister.py b/swh/lister/gitea/lister.py index 25bea4e..51084b6 100644 --- a/swh/lister/gitea/lister.py +++ b/swh/lister/gitea/lister.py @@ -1,142 +1,27 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -import random -from typing import Any, Dict, Iterator, List, Optional -from urllib.parse import urljoin -import iso8601 -import requests -from tenacity.before_sleep import before_sleep_log - -from swh.lister.utils import throttling_retry -from swh.scheduler.interface import SchedulerInterface -from swh.scheduler.model import ListedOrigin - -from .. import USER_AGENT -from ..pattern import CredentialsType, StatelessLister +from ..gogs.lister import GogsLister logger = logging.getLogger(__name__) -RepoListPage = List[Dict[str, Any]] - -class GiteaLister(StatelessLister[RepoListPage]): +class GiteaLister(GogsLister): """List origins from Gitea. Gitea API documentation: https://try.gitea.io/api/swagger The API does pagination and provides navigation URLs through the 'Link' header. The default value for page size is the maximum value observed on the instances accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/.""" LISTER_NAME = "gitea" - REPO_LIST_PATH = "repos/search" - - def __init__( - self, - scheduler: SchedulerInterface, - url: str, - instance: Optional[str] = None, - api_token: Optional[str] = None, - page_size: int = 50, - credentials: CredentialsType = None, - ): - super().__init__( - scheduler=scheduler, - credentials=credentials, - url=url, - instance=instance, - ) - - self.query_params = { - "sort": "id", - "order": "asc", - "limit": page_size, - "page": 1, - } - - self.session = requests.Session() - self.session.headers.update( - { - "Accept": "application/json", - "User-Agent": USER_AGENT, - } + def on_anonymous_mode(self): + logger.warning( + "No authentication token set in configuration, using anonymous mode" ) - - if api_token is None: - if len(self.credentials) > 0: - cred = random.choice(self.credentials) - username = cred.get("username") - api_token = cred["password"] - logger.warning( - "Using authentication token from user %s", username or "???" - ) - else: - logger.warning( - "No authentication token set in configuration, using anonymous mode" - ) - - if api_token: - self.session.headers["Authorization"] = "Token %s" % api_token - - @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) - def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: - - logger.info("Fetching URL %s with params %s", url, params) - - response = self.session.get(url, params=params) - - if response.status_code != 200: - logger.warning( - "Unexpected HTTP status code %s on %s: %s", - response.status_code, - response.url, - response.content, - ) - response.raise_for_status() - - return response - - @classmethod - def results_simplified(cls, body: Dict[str, RepoListPage]) -> RepoListPage: - fields_filter = ["id", "clone_url", "updated_at"] - return [{k: r[k] for k in fields_filter} for r in body["data"]] - - def get_pages(self) -> Iterator[RepoListPage]: - # base with trailing slash, path without leading slash for urljoin - url: str = urljoin(self.url, self.REPO_LIST_PATH) - - response = self.page_request(url, self.query_params) - - while True: - page_results = self.results_simplified(response.json()) - - yield page_results - - assert len(response.links) > 0, "API changed: no Link header found" - if "next" in response.links: - url = response.links["next"]["url"] - else: - # last page - break - - response = self.page_request(url, {}) - - def get_origins_from_page(self, page: RepoListPage) -> Iterator[ListedOrigin]: - """Convert a page of Gitea repositories into a list of ListedOrigins.""" - assert self.lister_obj.id is not None - - for repo in page: - last_update = iso8601.parse_date(repo["updated_at"]) - - yield ListedOrigin( - lister_id=self.lister_obj.id, - url=repo["clone_url"], - visit_type="git", - last_update=last_update, - ) diff --git a/swh/lister/gitea/tests/test_lister.py b/swh/lister/gitea/tests/test_lister.py index 860124e..08a17b5 100644 --- a/swh/lister/gitea/tests/test_lister.py +++ b/swh/lister/gitea/tests/test_lister.py @@ -1,151 +1,160 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path from typing import Dict, List, Tuple import pytest import requests -from swh.lister.gitea.lister import GiteaLister, RepoListPage +from swh.lister.gitea.lister import GiteaLister +from swh.lister.gogs.lister import GogsListerPage from swh.scheduler.model import ListedOrigin TRYGITEA_URL = "https://try.gitea.io/api/v1/" -TRYGITEA_P1_URL = TRYGITEA_URL + "repos/search?sort=id&order=asc&limit=3&page=1" -TRYGITEA_P2_URL = TRYGITEA_URL + "repos/search?sort=id&order=asc&limit=3&page=2" +TRYGITEA_P1_URL = TRYGITEA_URL + "repos/search?limit=3&page=1" +TRYGITEA_P2_URL = TRYGITEA_URL + "repos/search?limit=3&page=2" @pytest.fixture -def trygitea_p1(datadir) -> Tuple[str, Dict[str, str], RepoListPage, List[str]]: +def trygitea_p1(datadir) -> Tuple[str, Dict[str, str], GogsListerPage, List[str]]: text = Path(datadir, "https_try.gitea.io", "repos_page1").read_text() headers = { "Link": '<{p2}>; rel="next",<{p2}>; rel="last"'.format(p2=TRYGITEA_P2_URL) } - page_result = GiteaLister.results_simplified(json.loads(text)) - origin_urls = [r["clone_url"] for r in page_result] + page_data = json.loads(text) + page_result = GogsListerPage( + repos=GiteaLister.extract_repos(page_data), next_link=TRYGITEA_P2_URL + ) + origin_urls = [r["clone_url"] for r in page_data["data"]] return text, headers, page_result, origin_urls @pytest.fixture -def trygitea_p2(datadir) -> Tuple[str, Dict[str, str], RepoListPage, List[str]]: +def trygitea_p2(datadir) -> Tuple[str, Dict[str, str], GogsListerPage, List[str]]: text = Path(datadir, "https_try.gitea.io", "repos_page2").read_text() headers = { "Link": '<{p1}>; rel="prev",<{p1}>; rel="first"'.format(p1=TRYGITEA_P1_URL) } - page_result = GiteaLister.results_simplified(json.loads(text)) - origin_urls = [r["clone_url"] for r in page_result] + page_data = json.loads(text) + page_result = GogsListerPage( + repos=GiteaLister.extract_repos(page_data), next_link=None + ) + origin_urls = [r["clone_url"] for r in page_data["data"]] return text, headers, page_result, origin_urls def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedOrigin]): """Asserts that the two collections have the same origin URLs. Does not test last_update.""" sorted_lister_urls = list(sorted(lister_urls)) sorted_scheduler_origins = list(sorted(scheduler_origins)) assert len(sorted_lister_urls) == len(sorted_scheduler_origins) for l_url, s_origin in zip(sorted_lister_urls, sorted_scheduler_origins): assert l_url == s_origin.url def test_gitea_full_listing( swh_scheduler, requests_mock, mocker, trygitea_p1, trygitea_p2 ): """Covers full listing of multiple pages, rate-limit, page size (required for test), checking page results and listed origins, statelessness.""" kwargs = dict(url=TRYGITEA_URL, instance="try_gitea", page_size=3) lister = GiteaLister(scheduler=swh_scheduler, **kwargs) lister.get_origins_from_page = mocker.spy(lister, "get_origins_from_page") p1_text, p1_headers, p1_result, p1_origin_urls = trygitea_p1 p2_text, p2_headers, p2_result, p2_origin_urls = trygitea_p2 requests_mock.get(TRYGITEA_P1_URL, text=p1_text, headers=p1_headers) requests_mock.get( TRYGITEA_P2_URL, [ {"status_code": requests.codes.too_many_requests}, {"text": p2_text, "headers": p2_headers}, ], ) # end test setup stats = lister.run() # start test checks assert stats.pages == 2 assert stats.origins == 6 calls = [mocker.call(p1_result), mocker.call(p2_result)] lister.get_origins_from_page.assert_has_calls(calls) scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins(p1_origin_urls + p2_origin_urls, scheduler_origins) - assert lister.get_state_from_scheduler() is None + lister_state = lister.get_state_from_scheduler() + assert lister_state.last_seen_next_link == TRYGITEA_P2_URL + assert lister_state.last_seen_repo_id == p2_result.repos[-1]["id"] def test_gitea_auth_instance(swh_scheduler, requests_mock, trygitea_p1): """Covers token authentication, token from credentials, instance inference from URL.""" api_token = "teapot" instance = "try.gitea.io" creds = {"gitea": {instance: [{"username": "u", "password": api_token}]}} kwargs1 = dict(url=TRYGITEA_URL, api_token=api_token) lister = GiteaLister(scheduler=swh_scheduler, **kwargs1) # test API token assert "Authorization" in lister.session.headers assert lister.session.headers["Authorization"].lower() == "token %s" % api_token kwargs2 = dict(url=TRYGITEA_URL, credentials=creds) lister = GiteaLister(scheduler=swh_scheduler, **kwargs2) # test API token from credentials assert "Authorization" in lister.session.headers assert lister.session.headers["Authorization"].lower() == "token %s" % api_token # test instance inference from URL assert lister.instance assert "gitea" in lister.instance # infer something related to that # setup requests mocking p1_text, p1_headers, _, _ = trygitea_p1 p1_headers["Link"] = p1_headers["Link"].replace("next", "") # only 1 page base_url = TRYGITEA_URL + lister.REPO_LIST_PATH requests_mock.get(base_url, text=p1_text, headers=p1_headers) # now check the lister runs without error stats = lister.run() assert stats.pages == 1 @pytest.mark.parametrize("http_code", [400, 500, 502]) def test_gitea_list_http_error(swh_scheduler, requests_mock, http_code): """Test handling of some HTTP errors commonly encountered""" lister = GiteaLister(scheduler=swh_scheduler, url=TRYGITEA_URL, page_size=3) base_url = TRYGITEA_URL + lister.REPO_LIST_PATH requests_mock.get(base_url, status_code=http_code) with pytest.raises(requests.HTTPError): lister.run() scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results assert len(scheduler_origins) == 0 diff --git a/swh/lister/gogs/lister.py b/swh/lister/gogs/lister.py index be88333..f52784b 100644 --- a/swh/lister/gogs/lister.py +++ b/swh/lister/gogs/lister.py @@ -1,203 +1,209 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging import random from typing import Any, Dict, Iterator, List, Optional from urllib.parse import parse_qs, urljoin, urlparse import iso8601 import requests from tenacity.before_sleep import before_sleep_log from swh.lister.utils import throttling_retry from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from .. import USER_AGENT from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) Repo = Dict[str, Any] @dataclass class GogsListerPage: repos: Optional[List[Repo]] = None next_link: Optional[str] = None @dataclass class GogsListerState: last_seen_next_link: Optional[str] = None """Last link header (could be already visited) during an incremental pass.""" last_seen_repo_id: Optional[int] = None """Last repo id seen during an incremental pass.""" def _parse_page_id(url: Optional[str]) -> int: """Parse the page id from a Gogs page url.""" if url is None: return 0 return int(parse_qs(urlparse(url).query)["page"][0]) class GogsLister(Lister[GogsListerState, GogsListerPage]): """List origins from the Gogs Gogs API documentation: https://github.com/gogs/docs-api The API is protected behind authentication so credentials/API tokens are mandatory. It supports pagination and provides next page URL through the 'next' value of the 'Link' header. The default value for page size ('limit') is 10 but the maximum allowed value is 50. """ LISTER_NAME = "gogs" VISIT_TYPE = "git" REPO_LIST_PATH = "repos/search" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, api_token: Optional[str] = None, page_size: int = 50, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) self.query_params = { "limit": page_size, } self.api_token = api_token if self.api_token is None: if len(self.credentials) > 0: cred = random.choice(self.credentials) username = cred.get("username") self.api_token = cred["password"] logger.warning( "Using authentication credentials from user %s", username or "???" ) else: - raise ValueError("No credentials or API token provided") + # Raises an error on Gogs, or a warning on Gitea + self.on_anonymous_mode() self.max_page_limit = 2 self.session = requests.Session() self.session.headers.update( { "Accept": "application/json", "User-Agent": USER_AGENT, - "Authorization": f"token {self.api_token}", } ) + if self.api_token: + self.session.headers["Authorization"] = f"token {self.api_token}" + + def on_anonymous_mode(self): + raise ValueError("No credentials or API token provided") + def state_from_dict(self, d: Dict[str, Any]) -> GogsListerState: return GogsListerState(**d) def state_to_dict(self, state: GogsListerState) -> Dict[str, Any]: return asdict(state) @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) def page_request(self, url, params) -> requests.Response: logger.debug("Fetching URL %s with params %s", url, params) response = self.session.get(url, params=params) if response.status_code != 200: logger.warning( "Unexpected HTTP status code %s on %s: %s", response.status_code, response.url, response.content, ) response.raise_for_status() return response @classmethod def extract_repos(cls, body: Dict[str, Any]) -> List[Repo]: fields_filter = ["id", "clone_url", "updated_at"] return [{k: r[k] for k in fields_filter} for r in body["data"]] def get_pages(self) -> Iterator[GogsListerPage]: page_id = 1 if self.state.last_seen_next_link is not None: page_id = _parse_page_id(self.state.last_seen_next_link) # base with trailing slash, path without leading slash for urljoin next_link: Optional[str] = urljoin(self.url, self.REPO_LIST_PATH) response = self.page_request(next_link, {**self.query_params, "page": page_id}) while next_link is not None: repos = self.extract_repos(response.json()) assert len(response.links) > 0, "API changed: no Link header found" if "next" in response.links: next_link = response.links["next"]["url"] else: next_link = None # Happens for the last page yield GogsListerPage(repos=repos, next_link=next_link) if next_link is not None: response = self.page_request(next_link, {}) def get_origins_from_page(self, page: GogsListerPage) -> Iterator[ListedOrigin]: """Convert a page of Gogs repositories into a list of ListedOrigins""" assert self.lister_obj.id is not None assert page.repos is not None for r in page.repos: last_update = iso8601.parse_date(r["updated_at"]) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=r["clone_url"], last_update=last_update, ) def commit_page(self, page: GogsListerPage) -> None: last_seen_next_link = page.next_link page_id = _parse_page_id(last_seen_next_link) state_page_id = _parse_page_id(self.state.last_seen_next_link) if page_id > state_page_id: self.state.last_seen_next_link = last_seen_next_link if (page.repos is not None) and len(page.repos) > 0: self.state.last_seen_repo_id = page.repos[-1]["id"] def finalize(self) -> None: scheduler_state = self.get_state_from_scheduler() state_page_id = _parse_page_id(self.state.last_seen_next_link) scheduler_page_id = _parse_page_id(scheduler_state.last_seen_next_link) state_last_repo_id = self.state.last_seen_repo_id or 0 scheduler_last_repo_id = scheduler_state.last_seen_repo_id or 0 if (state_page_id >= scheduler_page_id) and ( state_last_repo_id > scheduler_last_repo_id ): self.updated = True # Marked updated only if it finds new repos