Changeset View
Changeset View
Standalone View
Standalone View
docs/new_lister_template.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from dataclasses import asdict, dataclass | |||||
import logging | |||||
from typing import Any, Dict, Iterator, List | |||||
from urllib.parse import urljoin | |||||
import requests | |||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister.utils import throttling_retry | |||||
from swh.scheduler.interface import SchedulerInterface | |||||
from swh.scheduler.model import ListedOrigin | |||||
from .. import USER_AGENT | |||||
from ..pattern import CredentialsType, Lister | |||||
logger = logging.getLogger(__name__) | |||||
# Aliasing the page results returned by `get_pages` method from the lister. | |||||
NewForgeListerPage = List[Dict[str, Any]] | |||||
@dataclass | |||||
class NewForgeListerState: | |||||
"""The NewForgeLister instance state. This is used for incremental listing. | |||||
""" | |||||
current: str = "" | |||||
"""Id of the last origin listed on an incremental pass""" | |||||
# If there is no need to keep state, subclass StatelessLister[NewForgeListerPage] | |||||
class NewForgeLister(Lister[NewForgeListerState, NewForgeListerPage]): | |||||
"""List origins from the "NewForge" forge. | |||||
""" | |||||
# Part of the lister API, that identifies this lister | |||||
LISTER_NAME = "" | |||||
# (Optional) CVS type of the origins listed by this lister, if constant | |||||
VISIT_TYPE = "" | |||||
# Instance URLs include the hostname and the common path prefix of processed URLs | |||||
EXAMPLE_BASE_URL = "https://netloc/api/v1/" | |||||
# Path of a specific resource to process, to join the base URL with | |||||
EXAMPLE_PATH = "origins/list" | |||||
def __init__( | |||||
self, | |||||
# Required | |||||
scheduler: SchedulerInterface, | |||||
# Instance URL, required for multi-instances listers (e.g gitlab, ...) | |||||
url: str, | |||||
# Instance name (free form) required for multi-instance listers, | |||||
# or computed from `url` | |||||
instance: str, | |||||
# Required whether lister supports authentication or not | |||||
credentials: CredentialsType = None, | |||||
): | |||||
super().__init__( | |||||
scheduler=scheduler, credentials=credentials, url=url, instance=instance, | |||||
) | |||||
self.session = requests.Session() | |||||
# Declare the USER_AGENT is more sysadm-friendly for the forge we list | |||||
self.session.headers.update( | |||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | |||||
) | |||||
def state_from_dict(self, d: Dict[str, Any]) -> NewForgeListerState: | |||||
return NewForgeListerState(**d) | |||||
def state_to_dict(self, state: NewForgeListerState) -> Dict[str, Any]: | |||||
return asdict(state) | |||||
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | |||||
def page_request(self, url, params) -> requests.Response: | |||||
# Do the network resource request under a retrying decorator | |||||
# to handle rate limiting and transient errors up to a limit. | |||||
# `throttling_retry` by default use the `requests` library to check | |||||
# only for rate-limit and a base-10 exponential waiting strategy. | |||||
# This can be customized by passed waiting, retrying and logging strategies | |||||
# as functions. See the `tenacity` library documentation. | |||||
# Log listed URL to ease debugging | |||||
logger.debug("Fetching URL %s with params %s", url, params) | |||||
response = self.session.get(url, params=params) | |||||
if response.status_code != 200: | |||||
# Log response content to ease debugging | |||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
# The lister must fail on blocking errors | |||||
response.raise_for_status() | |||||
return response | |||||
def get_pages(self) -> Iterator[NewForgeListerPage]: | |||||
# The algorithm depends on the service, but should request data reliably, | |||||
# following pagination if relevant and yielding pages in a streaming fashion. | |||||
# If incremental listing is supported, initialize from saved lister state. | |||||
# Make use of any next page URL provided. | |||||
# Simplify the results early to ease testing and debugging. | |||||
# Initialize from the lister saved state | |||||
current = "" | |||||
if self.state.current is not None: | |||||
current = self.state.current | |||||
# Construct the URL of a service endpoint, the lister can have others to fetch | |||||
url = urljoin(self.url, self.EXAMPLE_PATH) | |||||
while current is not None: | |||||
# Parametrize the request for incremental listing | |||||
body = self.page_request(url, {"current": current}).json() | |||||
# Simplify the page if possible to only the necessary elements | |||||
# and yield it | |||||
yield body | |||||
# Get the next page parameter or end the loop when there is none | |||||
current = body.get("next") | |||||
def get_origins_from_page(self, page: NewForgeListerPage) -> Iterator[ListedOrigin]: | |||||
"""Convert a page of NewForgeLister repositories into a list of ListedOrigins""" | |||||
assert self.lister_obj.id is not None | |||||
for element in page: | |||||
yield ListedOrigin( | |||||
# Required. Should use this value. | |||||
lister_id=self.lister_obj.id, | |||||
# Required. Visit type of the currently processed origin | |||||
visit_type=self.VISIT_TYPE, | |||||
# Required. URL corresponding to the origin for loaders to ingest | |||||
url=..., | |||||
# Should get it if the service provides it and if it induces no | |||||
# substantial additional processing cost | |||||
last_update=..., | |||||
) | |||||
def commit_page(self, page: NewForgeListerPage) -> None: | |||||
# Update the lister state to the latest `current` | |||||
current = page[-1]["current"] | |||||
if current > self.state.current: | |||||
self.state.current = current | |||||
def finalize(self) -> None: | |||||
# Pull fresh lister state from the scheduler backend, in case multiple | |||||
# listers run concurrently | |||||
scheduler_state = self.get_state_from_scheduler() | |||||
# Update the lister state in the backend only if `current` is fresher than | |||||
# the one stored in the database. | |||||
if self.state.current > scheduler_state.current: | |||||
self.updated = True |