Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/gogs/lister.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import asdict, dataclass | |||||
import logging | import logging | ||||
import random | import random | ||||
from typing import Any, Dict, Iterator, List, Optional | from typing import Any, Dict, Iterator, List, Optional | ||||
from urllib.parse import urljoin | from urllib.parse import parse_qs, urljoin, urlparse | ||||
import iso8601 | import iso8601 | ||||
import requests | import requests | ||||
from tenacity.before_sleep import before_sleep_log | from tenacity.before_sleep import before_sleep_log | ||||
from swh.lister.utils import throttling_retry | from swh.lister.utils import throttling_retry | ||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from .. import USER_AGENT | from .. import USER_AGENT | ||||
from ..pattern import CredentialsType, StatelessLister | from ..pattern import CredentialsType, Lister | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Aliasing page results returned by `GogsLister.get_pages` method | Repo = Dict[str, Any] | ||||
GogsListerPage = List[Dict[str, Any]] | |||||
class GogsLister(StatelessLister[GogsListerPage]): | @dataclass | ||||
class GogsListerPage: | |||||
repos: Optional[List[Repo]] = None | |||||
next_link: Optional[str] = None | |||||
@dataclass | |||||
class GogsListerState: | |||||
last_seen_next_link: Optional[str] = None | |||||
"""Last link header (could be already visited) during an incremental pass.""" | |||||
last_seen_repo_id: Optional[int] = None | |||||
"""Last repo id seen during an incremental pass.""" | |||||
def _parse_page_id(url: Optional[str]) -> int: | |||||
"""Parse the page id from a Gogs page url.""" | |||||
if url is None: | |||||
return 0 | |||||
return int(parse_qs(urlparse(url).query)["page"][0]) | |||||
class GogsLister(Lister[GogsListerState, GogsListerPage]): | |||||
"""List origins from the Gogs | """List origins from the Gogs | ||||
Gogs API documentation: https://github.com/gogs/docs-api | Gogs API documentation: https://github.com/gogs/docs-api | ||||
The API is protected behind authentication so credentials/API tokens | The API is protected behind authentication so credentials/API tokens | ||||
are mandatory. It supports pagination and provides next page URL | are mandatory. It supports pagination and provides next page URL | ||||
through the 'next' value of the 'Link' header. The default value for | through the 'next' value of the 'Link' header. The default value for | ||||
Show All 19 Lines | ): | ||||
scheduler=scheduler, | scheduler=scheduler, | ||||
credentials=credentials, | credentials=credentials, | ||||
url=url, | url=url, | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
self.query_params = { | self.query_params = { | ||||
"limit": page_size, | "limit": page_size, | ||||
"page": 1, | |||||
} | } | ||||
self.api_token = api_token | self.api_token = api_token | ||||
if self.api_token is None: | if self.api_token is None: | ||||
if len(self.credentials) > 0: | if len(self.credentials) > 0: | ||||
cred = random.choice(self.credentials) | cred = random.choice(self.credentials) | ||||
username = cred.get("username") | username = cred.get("username") | ||||
Show All 10 Lines | ): | ||||
self.session.headers.update( | self.session.headers.update( | ||||
{ | { | ||||
"Accept": "application/json", | "Accept": "application/json", | ||||
"User-Agent": USER_AGENT, | "User-Agent": USER_AGENT, | ||||
"Authorization": f"token {self.api_token}", | "Authorization": f"token {self.api_token}", | ||||
} | } | ||||
) | ) | ||||
def state_from_dict(self, d: Dict[str, Any]) -> GogsListerState: | |||||
return GogsListerState(**d) | |||||
def state_to_dict(self, state: GogsListerState) -> Dict[str, Any]: | |||||
return asdict(state) | |||||
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | ||||
def page_request(self, url, params) -> requests.Response: | def page_request(self, url, params) -> requests.Response: | ||||
logger.debug("Fetching URL %s with params %s", url, params) | logger.debug("Fetching URL %s with params %s", url, params) | ||||
response = self.session.get(url, params=params) | response = self.session.get(url, params=params) | ||||
if response.status_code != 200: | if response.status_code != 200: | ||||
logger.warning( | logger.warning( | ||||
"Unexpected HTTP status code %s on %s: %s", | "Unexpected HTTP status code %s on %s: %s", | ||||
response.status_code, | response.status_code, | ||||
response.url, | response.url, | ||||
response.content, | response.content, | ||||
) | ) | ||||
response.raise_for_status() | response.raise_for_status() | ||||
return response | return response | ||||
@classmethod | @classmethod | ||||
def results_simplified(cls, body: Dict[str, GogsListerPage]) -> GogsListerPage: | def extract_repos(cls, body: Dict[str, Any]) -> List[Repo]: | ||||
fields_filter = ["id", "clone_url", "updated_at"] | fields_filter = ["id", "clone_url", "updated_at"] | ||||
return [{k: r[k] for k in fields_filter} for r in body["data"]] | return [{k: r[k] for k in fields_filter} for r in body["data"]] | ||||
def get_pages(self) -> Iterator[GogsListerPage]: | def get_pages(self) -> Iterator[GogsListerPage]: | ||||
# base with trailing slash, path without leading slash for urljoin | page_id = 1 | ||||
url = urljoin(self.url, self.REPO_LIST_PATH) | if self.state.last_seen_next_link is not None: | ||||
vlorentz: not covered? | |||||
response = self.page_request(url, self.query_params) | page_id = _parse_page_id(self.state.last_seen_next_link) | ||||
while True: | # base with trailing slash, path without leading slash for urljoin | ||||
page_results = self.results_simplified(response.json()) | next_link: Optional[str] = urljoin(self.url, self.REPO_LIST_PATH) | ||||
response = self.page_request(next_link, {**self.query_params, "page": page_id}) | |||||
yield page_results | while next_link is not None: | ||||
repos = self.extract_repos(response.json()) | |||||
assert len(response.links) > 0, "API changed: no Link header found" | assert len(response.links) > 0, "API changed: no Link header found" | ||||
if "next" in response.links: | if "next" in response.links: | ||||
url = response.links["next"]["url"] | next_link = response.links["next"]["url"] | ||||
else: | else: | ||||
break | next_link = None # Happens for the last page | ||||
response = self.page_request(url, {}) | yield GogsListerPage(repos=repos, next_link=next_link) | ||||
if next_link is not None: | |||||
response = self.page_request(next_link, {}) | |||||
def get_origins_from_page(self, page: GogsListerPage) -> Iterator[ListedOrigin]: | def get_origins_from_page(self, page: GogsListerPage) -> Iterator[ListedOrigin]: | ||||
"""Convert a page of Gogs repositories into a list of ListedOrigins""" | """Convert a page of Gogs repositories into a list of ListedOrigins""" | ||||
assert self.lister_obj.id is not None | assert self.lister_obj.id is not None | ||||
assert page.repos is not None | |||||
for repo in page: | for r in page.repos: | ||||
last_update = iso8601.parse_date(repo["updated_at"]) | last_update = iso8601.parse_date(r["updated_at"]) | ||||
yield ListedOrigin( | yield ListedOrigin( | ||||
lister_id=self.lister_obj.id, | lister_id=self.lister_obj.id, | ||||
visit_type=self.VISIT_TYPE, | visit_type=self.VISIT_TYPE, | ||||
url=repo["clone_url"], | url=r["clone_url"], | ||||
last_update=last_update, | last_update=last_update, | ||||
) | ) | ||||
def commit_page(self, page: GogsListerPage) -> None: | |||||
last_seen_next_link = page.next_link | |||||
page_id = _parse_page_id(last_seen_next_link) | |||||
state_page_id = _parse_page_id(self.state.last_seen_next_link) | |||||
if page_id > state_page_id: | |||||
self.state.last_seen_next_link = last_seen_next_link | |||||
if (page.repos is not None) and len(page.repos) > 0: | |||||
self.state.last_seen_repo_id = page.repos[-1]["id"] | |||||
def finalize(self) -> None: | |||||
scheduler_state = self.get_state_from_scheduler() | |||||
state_page_id = _parse_page_id(self.state.last_seen_next_link) | |||||
scheduler_page_id = _parse_page_id(scheduler_state.last_seen_next_link) | |||||
state_last_repo_id = self.state.last_seen_repo_id or 0 | |||||
scheduler_last_repo_id = scheduler_state.last_seen_repo_id or 0 | |||||
if (state_page_id >= scheduler_page_id) and ( | |||||
state_last_repo_id > scheduler_last_repo_id | |||||
): | |||||
self.updated = True # Marked updated only if it finds new repos |
not covered?