diff --git a/swh/lister/gitea/lister.py b/swh/lister/gitea/lister.py index 51084b6..e429756 100644 --- a/swh/lister/gitea/lister.py +++ b/swh/lister/gitea/lister.py @@ -1,27 +1,22 @@ -# Copyright (C) 2018-2021 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from ..gogs.lister import GogsLister logger = logging.getLogger(__name__) class GiteaLister(GogsLister): """List origins from Gitea. Gitea API documentation: https://try.gitea.io/api/swagger The API does pagination and provides navigation URLs through the 'Link' header. The default value for page size is the maximum value observed on the instances accessible at https://try.gitea.io/api/v1/ and https://codeberg.org/api/v1/.""" LISTER_NAME = "gitea" - - def on_anonymous_mode(self): - logger.warning( - "No authentication token set in configuration, using anonymous mode" - ) diff --git a/swh/lister/gogs/lister.py b/swh/lister/gogs/lister.py index f87100d..ce8a398 100644 --- a/swh/lister/gogs/lister.py +++ b/swh/lister/gogs/lister.py @@ -1,206 +1,208 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import asdict, dataclass import logging import random from typing import Any, Dict, Iterator, List, Optional, Tuple from urllib.parse import parse_qs, parse_qsl, urlencode, urljoin, urlparse import iso8601 from requests.exceptions import HTTPError from swh.scheduler.interface import SchedulerInterface from swh.scheduler.model import ListedOrigin from ..pattern import CredentialsType, Lister logger = logging.getLogger(__name__) Repo = Dict[str, Any] @dataclass class GogsListerPage: repos: Optional[List[Repo]] = None next_link: Optional[str] = None @dataclass class GogsListerState: last_seen_next_link: Optional[str] = None """Last link header (could be already visited) during an incremental pass.""" last_seen_repo_id: Optional[int] = None """Last repo id seen during an incremental pass.""" def _parse_page_id(url: Optional[str]) -> int: """Parse the page id from a Gogs page url.""" if url is None: return 0 return int(parse_qs(urlparse(url).query)["page"][0]) class GogsLister(Lister[GogsListerState, GogsListerPage]): """List origins from the Gogs Gogs API documentation: https://github.com/gogs/docs-api - The API is protected behind authentication so credentials/API tokens - are mandatory. It supports pagination and provides next page URL - through the 'next' value of the 'Link' header. The default value for - page size ('limit') is 10 but the maximum allowed value is 50. + The API may be protected behind authentication so credentials/API tokens can be + provided. + + The lister supports pagination and provides next page URL through the 'next' value + of the 'Link' header. The default value for page size ('limit') is 10 but the + maximum allowed value is 50. + + Api can usually be found at the location: https:///api/v1/repos/search + """ LISTER_NAME = "gogs" VISIT_TYPE = "git" REPO_LIST_PATH = "repos/search" def __init__( self, scheduler: SchedulerInterface, url: str, instance: Optional[str] = None, api_token: Optional[str] = None, page_size: int = 50, credentials: CredentialsType = None, ): super().__init__( scheduler=scheduler, credentials=credentials, url=url, instance=instance, ) self.query_params = { "limit": page_size, } self.api_token = api_token if self.api_token is None: if len(self.credentials) > 0: cred = random.choice(self.credentials) username = cred.get("username") self.api_token = cred["password"] logger.info("Using authentication credentials from user %s", username) - else: - # Raises an error on Gogs, or a warning on Gitea - self.on_anonymous_mode() self.session.headers.update({"Accept": "application/json"}) if self.api_token: self.session.headers["Authorization"] = f"token {self.api_token}" - - def on_anonymous_mode(self): - raise ValueError("No credentials or API token provided") + else: + logger.warning( + "No authentication token set in configuration, using anonymous mode" + ) def state_from_dict(self, d: Dict[str, Any]) -> GogsListerState: return GogsListerState(**d) def state_to_dict(self, state: GogsListerState) -> Dict[str, Any]: return asdict(state) def page_request( self, url: str, params: Dict[str, Any] ) -> Tuple[Dict[str, Any], Dict[str, Any]]: logger.debug("Fetching URL %s with params %s", url, params) try: response = self.http_request(url, params=params) except HTTPError as http_error: if ( http_error.response.status_code == 500 ): # Temporary hack for skipping fatal repos (T4423) url_parts = urlparse(url) query: Dict[str, Any] = dict(parse_qsl(url_parts.query)) query.update({"page": _parse_page_id(url) + 1}) next_page_link = url_parts._replace(query=urlencode(query)).geturl() body: Dict[str, Any] = {"data": []} links = {"next": {"url": next_page_link}} return body, links else: raise return response.json(), response.links @classmethod def extract_repos(cls, body: Dict[str, Any]) -> List[Repo]: fields_filter = ["id", "clone_url", "updated_at"] return [{k: r[k] for k in fields_filter} for r in body["data"]] def get_pages(self) -> Iterator[GogsListerPage]: page_id = 1 if self.state.last_seen_next_link is not None: page_id = _parse_page_id(self.state.last_seen_next_link) # base with trailing slash, path without leading slash for urljoin next_link: Optional[str] = urljoin(self.url, self.REPO_LIST_PATH) assert next_link is not None body, links = self.page_request( next_link, {**self.query_params, "page": page_id} ) while next_link is not None: repos = self.extract_repos(body) - assert len(links) > 0, "API changed: no Link header found" if "next" in links: next_link = links["next"]["url"] else: next_link = None # Happens for the last page yield GogsListerPage(repos=repos, next_link=next_link) if next_link is not None: body, links = self.page_request(next_link, {}) def get_origins_from_page(self, page: GogsListerPage) -> Iterator[ListedOrigin]: """Convert a page of Gogs repositories into a list of ListedOrigins""" assert self.lister_obj.id is not None assert page.repos is not None for r in page.repos: last_update = iso8601.parse_date(r["updated_at"]) yield ListedOrigin( lister_id=self.lister_obj.id, visit_type=self.VISIT_TYPE, url=r["clone_url"], last_update=last_update, ) def commit_page(self, page: GogsListerPage) -> None: last_seen_next_link = page.next_link page_id = _parse_page_id(last_seen_next_link) state_page_id = _parse_page_id(self.state.last_seen_next_link) if page_id > state_page_id: self.state.last_seen_next_link = last_seen_next_link if (page.repos is not None) and len(page.repos) > 0: self.state.last_seen_repo_id = page.repos[-1]["id"] def finalize(self) -> None: scheduler_state = self.get_state_from_scheduler() state_page_id = _parse_page_id(self.state.last_seen_next_link) scheduler_page_id = _parse_page_id(scheduler_state.last_seen_next_link) state_last_repo_id = self.state.last_seen_repo_id or 0 scheduler_last_repo_id = scheduler_state.last_seen_repo_id or 0 if (state_page_id >= scheduler_page_id) and ( state_last_repo_id > scheduler_last_repo_id ): self.updated = True # Marked updated only if it finds new repos diff --git a/swh/lister/gogs/tests/test_lister.py b/swh/lister/gogs/tests/test_lister.py index 4f9e370..c90c2bc 100644 --- a/swh/lister/gogs/tests/test_lister.py +++ b/swh/lister/gogs/tests/test_lister.py @@ -1,330 +1,330 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path from typing import List from unittest.mock import Mock import pytest from requests import HTTPError from swh.lister.gogs.lister import GogsLister, GogsListerPage, _parse_page_id from swh.scheduler.model import ListedOrigin TRY_GOGS_URL = "https://try.gogs.io/api/v1/" def try_gogs_page(n: int): return TRY_GOGS_URL + GogsLister.REPO_LIST_PATH + f"?page={n}&limit=3" P1 = try_gogs_page(1) P2 = try_gogs_page(2) P3 = try_gogs_page(3) P4 = try_gogs_page(4) @pytest.fixture def trygogs_p1(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page1").read_text() headers = {"Link": f'<{P2}>; rel="next"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P2 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p2(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page2").read_text() headers = {"Link": f'<{P3}>; rel="next",<{P1}>; rel="prev"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P3 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p3(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page3").read_text() headers = {"Link": f'<{P4}>; rel="next",<{P2}>; rel="prev"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P3 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p4(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page4").read_text() headers = {"Link": f'<{P3}>; rel="prev"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=P3 ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p3_last(datadir): text = Path(datadir, "https_try.gogs.io", "repos_page3").read_text() headers = {"Link": f'<{P2}>; rel="prev",<{P1}>; rel="first"'} page_result = GogsListerPage( repos=GogsLister.extract_repos(json.loads(text)), next_link=None ) origin_urls = [r["clone_url"] for r in page_result.repos] return text, headers, page_result, origin_urls @pytest.fixture def trygogs_p3_empty(): origins_urls = [] body = {"data": [], "ok": True} headers = {"Link": f'<{P2}>; rel="prev",<{P1}>; rel="first"'} page_result = GogsListerPage(repos=GogsLister.extract_repos(body), next_link=None) text = json.dumps(body) return text, headers, page_result, origins_urls def check_listed_origins(lister_urls: List[str], scheduler_origins: List[ListedOrigin]): """Asserts that the two collections have the same origin URLs. Does not test last_update.""" assert set(lister_urls) == {origin.url for origin in scheduler_origins} def test_gogs_full_listing( swh_scheduler, requests_mock, mocker, trygogs_p1, trygogs_p2, trygogs_p3_last ): kwargs = dict( url=TRY_GOGS_URL, instance="try_gogs", page_size=3, api_token="secret" ) lister = GogsLister(scheduler=swh_scheduler, **kwargs) lister.get_origins_from_page: Mock = mocker.spy(lister, "get_origins_from_page") p1_text, p1_headers, p1_result, p1_origin_urls = trygogs_p1 p2_text, p2_headers, p2_result, p2_origin_urls = trygogs_p2 p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3_last requests_mock.get(P1, text=p1_text, headers=p1_headers) requests_mock.get(P2, text=p2_text, headers=p2_headers) requests_mock.get(P3, text=p3_text, headers=p3_headers) stats = lister.run() assert stats.pages == 3 assert stats.origins == 9 calls = map(mocker.call, [p1_result, p2_result, p3_result]) lister.get_origins_from_page.assert_has_calls(list(calls)) scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( p1_origin_urls + p2_origin_urls + p3_origin_urls, scheduler_origins ) assert ( lister.get_state_from_scheduler().last_seen_next_link == P3 ) # P3 didn't provide any next link so it remains the last_seen_next_link def test_gogs_auth_instance( swh_scheduler, requests_mock, trygogs_p1, trygogs_p2, trygogs_p3_empty ): - """Covers token authentication, token from credentials, + """Covers without authentication, token authentication, token from credentials, instance inference from URL.""" api_token = "secret" instance = "try_gogs" # Test lister initialization without api_token or credentials: - with pytest.raises(ValueError, match="No credentials or API token provided"): - kwargs1 = dict(url=TRY_GOGS_URL, instance=instance) - GogsLister(scheduler=swh_scheduler, **kwargs1) + kwargs1 = dict(url=TRY_GOGS_URL, instance=instance) + lister = GogsLister(scheduler=swh_scheduler, **kwargs1) + assert "Authorization" not in lister.session.headers # Test lister initialization using api_token: kwargs2 = dict(url=TRY_GOGS_URL, api_token=api_token, instance=instance) lister = GogsLister(scheduler=swh_scheduler, **kwargs2) assert lister.session.headers["Authorization"].lower() == "token %s" % api_token # Test lister initialization with credentials and run it: creds = {"gogs": {instance: [{"username": "u", "password": api_token}]}} kwargs3 = dict(url=TRY_GOGS_URL, credentials=creds, instance=instance, page_size=3) lister = GogsLister(scheduler=swh_scheduler, **kwargs3) assert lister.session.headers["Authorization"].lower() == "token %s" % api_token assert lister.instance == "try_gogs" # setup requests mocking p1_text, p1_headers, _, _ = trygogs_p1 p2_text, p2_headers, _, _ = trygogs_p2 p3_text, p3_headers, _, _ = trygogs_p3_empty requests_mock.get(P1, text=p1_text, headers=p1_headers) requests_mock.get(P2, text=p2_text, headers=p2_headers) requests_mock.get(P3, text=p3_text, headers=p3_headers) # lister should run without any error and extract the origins stats = lister.run() assert stats.pages == 3 assert stats.origins == 6 @pytest.mark.parametrize("http_code", [400, 500]) def test_gogs_list_http_error( swh_scheduler, requests_mock, http_code, trygogs_p1, trygogs_p3_last ): """Test handling of some HTTP errors commonly encountered""" lister = GogsLister(scheduler=swh_scheduler, url=TRY_GOGS_URL, api_token="secret") p1_text, p1_headers, _, p1_origin_urls = trygogs_p1 p3_text, p3_headers, _, p3_origin_urls = trygogs_p3_last base_url = TRY_GOGS_URL + lister.REPO_LIST_PATH requests_mock.get( base_url, [ {"text": p1_text, "headers": p1_headers, "status_code": 200}, {"status_code": http_code}, {"text": p3_text, "headers": p3_headers, "status_code": 200}, ], ) # pages with fatal repositories should be skipped (no error raised) # See T4423 for more details if http_code == 500: lister.run() else: with pytest.raises(HTTPError): lister.run() # Both P1 and P3 origins should be listed in case of 500 error # While in other cases, only P1 origins should be listed scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( (p1_origin_urls + p3_origin_urls) if http_code == 500 else p1_origin_urls, scheduler_origins, ) def test_gogs_incremental_lister( swh_scheduler, requests_mock, mocker, trygogs_p1, trygogs_p2, trygogs_p3, trygogs_p3_last, trygogs_p3_empty, trygogs_p4, ): kwargs = dict( url=TRY_GOGS_URL, instance="try_gogs", page_size=3, api_token="secret" ) lister = GogsLister(scheduler=swh_scheduler, **kwargs) lister.get_origins_from_page: Mock = mocker.spy(lister, "get_origins_from_page") # First listing attempt: P1 and P2 return 3 origins each # while P3 (current last page) is empty. p1_text, p1_headers, p1_result, p1_origin_urls = trygogs_p1 p2_text, p2_headers, p2_result, p2_origin_urls = trygogs_p2 p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3_empty requests_mock.get(P1, text=p1_text, headers=p1_headers) requests_mock.get(P2, text=p2_text, headers=p2_headers) requests_mock.get(P3, text=p3_text, headers=p3_headers) attempt1_stats = lister.run() assert attempt1_stats.pages == 3 assert attempt1_stats.origins == 6 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P3 assert lister_state.last_seen_repo_id == p2_result.repos[-1]["id"] assert lister.updated check_listed_origins(p1_origin_urls + p2_origin_urls, scheduler_origins) lister.updated = False # Reset the flag # Second listing attempt: P3 isn't empty anymore. # The lister should restart from last state and hence revisit P3. p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3_last requests_mock.get(P3, text=p3_text, headers=p3_headers) lister.session.request = mocker.spy(lister.session, "request") attempt2_stats = lister.run() assert attempt2_stats.pages == 1 assert attempt2_stats.origins == 3 scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results page_id = _parse_page_id(lister_state.last_seen_next_link) query_params = lister.query_params query_params["page"] = page_id lister.session.request.assert_called_once_with( "GET", TRY_GOGS_URL + lister.REPO_LIST_PATH, params=query_params ) # All the 9 origins (3 pages) should be passed on to the scheduler: check_listed_origins( p1_origin_urls + p2_origin_urls + p3_origin_urls, scheduler_origins ) lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P3 assert lister_state.last_seen_repo_id == p3_result.repos[-1]["id"] assert lister.updated lister.updated = False # Reset the flag # Third listing attempt: No new origins # The lister should revisit last seen page (P3) attempt3_stats = lister.run() assert attempt3_stats.pages == 1 assert attempt3_stats.origins == 3 lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P3 assert lister_state.last_seen_repo_id == p3_result.repos[-1]["id"] assert lister.updated is False # No new origins so state isn't updated. # Fourth listing attempt: Page 4 is introduced and returns 3 new origins # The lister should revisit last seen page (P3) as well as P4. p3_text, p3_headers, p3_result, p3_origin_urls = trygogs_p3 # new P3 points to P4 p4_text, p4_headers, p4_result, p4_origin_urls = trygogs_p4 requests_mock.get(P3, text=p3_text, headers=p3_headers) requests_mock.get(P4, text=p4_text, headers=p4_headers) attempt4_stats = lister.run() assert attempt4_stats.pages == 2 assert attempt4_stats.origins == 6 lister_state = lister.get_state_from_scheduler() assert lister_state.last_seen_next_link == P4 assert lister_state.last_seen_repo_id == p4_result.repos[-1]["id"] assert lister.updated # All the 12 origins (4 pages) should be passed on to the scheduler: scheduler_origins = swh_scheduler.get_listed_origins(lister.lister_obj.id).results check_listed_origins( p1_origin_urls + p2_origin_urls + p3_origin_urls + p4_origin_urls, scheduler_origins, )