Changeset View
Changeset View
Standalone View
Standalone View
docs/new_lister_template.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import asdict, dataclass | from dataclasses import asdict, dataclass | ||||
import logging | import logging | ||||
from typing import Any, Dict, Iterator, List | from typing import Any, Dict, Iterator, List | ||||
from urllib.parse import urljoin | from urllib.parse import urljoin | ||||
import requests | |||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister.utils import http_retry | |||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
from .. import USER_AGENT | |||||
from ..pattern import CredentialsType, Lister | from ..pattern import CredentialsType, Lister | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Aliasing the page results returned by `get_pages` method from the lister. | # Aliasing the page results returned by `get_pages` method from the lister. | ||||
NewForgeListerPage = List[Dict[str, Any]] | NewForgeListerPage = List[Dict[str, Any]] | ||||
Show All 33 Lines | class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]): | ||||
): | ): | ||||
super().__init__( | super().__init__( | ||||
scheduler=scheduler, | scheduler=scheduler, | ||||
credentials=credentials, | credentials=credentials, | ||||
url=url, | url=url, | ||||
instance=instance, | instance=instance, | ||||
) | ) | ||||
self.session = requests.Session() | self.session.headers.update({"Accept": "application/json"}) | ||||
# Declare the USER_AGENT is more sysadm-friendly for the forge we list | |||||
self.session.headers.update( | |||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | |||||
) | |||||
def state_from_dict(self, d: Dict[str, Any]) -> NewForgeListerState: | def state_from_dict(self, d: Dict[str, Any]) -> NewForgeListerState: | ||||
return NewForgeListerState(**d) | return NewForgeListerState(**d) | ||||
def state_to_dict(self, state: NewForgeListerState) -> Dict[str, Any]: | def state_to_dict(self, state: NewForgeListerState) -> Dict[str, Any]: | ||||
return asdict(state) | return asdict(state) | ||||
@http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | |||||
def page_request(self, url, params) -> requests.Response: | |||||
# Do the network resource request under a retrying decorator | |||||
# to handle rate limiting and transient errors up to a limit. | |||||
# `http_retry` by default use the `requests` library to check | |||||
# only for rate-limit and a base-10 exponential waiting strategy. | |||||
# This can be customized by passed waiting, retrying and logging strategies | |||||
# as functions. See the `tenacity` library documentation. | |||||
# Log listed URL to ease debugging | |||||
logger.debug("Fetching URL %s with params %s", url, params) | |||||
response = self.session.get(url, params=params) | |||||
if response.status_code != 200: | |||||
# Log response content to ease debugging | |||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
# The lister must fail on blocking errors | |||||
response.raise_for_status() | |||||
return response | |||||
def get_pages(self) -> Iterator[NewForgeListerPage]: | def get_pages(self) -> Iterator[NewForgeListerPage]: | ||||
# The algorithm depends on the service, but should request data reliably, | # The algorithm depends on the service, but should request data reliably, | ||||
# following pagination if relevant and yielding pages in a streaming fashion. | # following pagination if relevant and yielding pages in a streaming fashion. | ||||
# If incremental listing is supported, initialize from saved lister state. | # If incremental listing is supported, initialize from saved lister state. | ||||
# Make use of any next page URL provided. | # Make use of any next page URL provided. | ||||
# Simplify the results early to ease testing and debugging. | # Simplify the results early to ease testing and debugging. | ||||
# Initialize from the lister saved state | # Initialize from the lister saved state | ||||
current = "" | current = "" | ||||
if self.state.current is not None: | if self.state.current is not None: | ||||
current = self.state.current | current = self.state.current | ||||
# Construct the URL of a service endpoint, the lister can have others to fetch | # Construct the URL of a service endpoint, the lister can have others to fetch | ||||
url = urljoin(self.url, self.EXAMPLE_PATH) | url = urljoin(self.url, self.EXAMPLE_PATH) | ||||
while current is not None: | while current is not None: | ||||
# Parametrize the request for incremental listing | # Parametrize the request for incremental listing | ||||
body = self.page_request(url, {"current": current}).json() | body = self.http_request(url, params={"current": current}).json() | ||||
# Simplify the page if possible to only the necessary elements | # Simplify the page if possible to only the necessary elements | ||||
# and yield it | # and yield it | ||||
yield body | yield body | ||||
# Get the next page parameter or end the loop when there is none | # Get the next page parameter or end the loop when there is none | ||||
current = body.get("next") | current = body.get("next") | ||||
Show All 34 Lines |