diff --git a/docs/new_lister_template.py b/docs/new_lister_template.py new file mode 100644 --- /dev/null +++ b/docs/new_lister_template.py @@ -0,0 +1,166 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import asdict, dataclass +import logging +from typing import Any, Dict, Iterator, List +from urllib.parse import urljoin + +import requests +from tenacity.before_sleep import before_sleep_log + +from swh.lister.utils import throttling_retry +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import ListedOrigin + +from .. import USER_AGENT +from ..pattern import CredentialsType, Lister + +logger = logging.getLogger(__name__) + +# Type alias of a page as returned by `get_pages`. Optional, but good for readability. +MyPageType = List[Dict[str, Any]] + + +@dataclass +class MyListerState: + """State of My lister""" + + # Holds the state that need to be kept to do incremental listing + + current: str = "" + """Id of the last origin listed on an incremental pass""" + + +# If there is no need to keep state, subclass StatelessLister[MyPageType] +class MyLister(Lister[MyListerState, MyPageType]): + """List origins from My software source. + + """ + + # Part of the lister API, that identifies this lister + LISTER_NAME = "" + # Not required, CVS type of the origins listed by this lister, if constant + VISIT_TYPE = "" + + # Instance URLs include the hostname and the common path prefix of processed URLs + EXAMPLE_BASE_URL = "https://netloc/api" + # Path of a specific resource to process, to join the base URL with + EXAMPLE_PATH = "origins/list" + + def __init__( + self, + # Required + scheduler: SchedulerInterface, + # Instance URL, required for multi-instance listers + url: str, + # Instance name (free form) required for multi-instance listers, + # or computed from `url` + instance: str, + # Must declare this argument if the lister supports authentication + credentials: CredentialsType = None, + ): + super().__init__( + scheduler=scheduler, credentials=credentials, url=url, instance=instance, + ) + + # not required to set up a session, but must use the user-agent + self.session = requests.Session() + self.session.headers.update( + {"Accept": "application/json", "User-Agent": USER_AGENT} + ) + + def state_from_dict(self, d: Dict[str, Any]) -> MyListerState: + return MyListerState(**d) + + def state_to_dict(self, state: MyListerState) -> Dict[str, Any]: + return asdict(state) + + @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) + def page_request(self, url, params) -> requests.Response: + # Do the network resource request under a retrying decorator + # to handle rate limiting and transient errors up to a limit. + # `throttling_retry` by default use the `requests` library to check + # only for rate-limit and a base-10 exponential waiting strategy. + # This can be customized by passed waiting, retrying and logging strategies + # as functions. See the `tenacity` library documentation. + + # Log URL to fetch to ease debugging + logger.debug("Fetching URL %s with params %s", url, params) + response = self.session.get(url, params=params) + + if response.status_code != 200: + # Log response content to ease debugging + logger.warning( + "Unexpected HTTP status code %s on %s: %s", + response.status_code, + response.url, + response.content, + ) + # The lister must fail on blocking errors + response.raise_for_status() + + return response + + def get_pages(self) -> Iterator[MyPageType]: + # The algorithm depends on the service, but should request data reliably, + # following pagination if relevant and yielding pages in a streaming fashion. + # If incremental listing is supported, initialize from saved lister state. + # Make use of any next page URL provided. + # Simplify the results early to ease testing and debugging. + + # Initialize from the lister saved state + current = "" + if self.state.current is not None: + current = self.state.current + + # Construct the URL of a service endpoint, the lister can have others to fetch + url = urljoin(self.url, self.EXAMPLE_PATH) + + while current is not None: + # Parametrize the request for incremental listing + body = self.page_request(url, {"current": current}).json() + + # Simplify the page if possible to only the necessary elements + # and yield it + yield body + + # Get the next page parameter or end the loop when there is none + current = body.get("next") + + def get_origins_from_page(self, page: MyPageType) -> Iterator[ListedOrigin]: + """Convert a page of My repositories into a list of ListedOrigins""" + assert self.lister_obj.id is not None + + for element in page: + + yield ListedOrigin( + # Required. Should use this value. + lister_id=self.lister_obj.id, + # Visit type of the currently processed origin + visit_type=self.VISIT_TYPE, + # Required. URL corresponding best to the origin. + url=..., + # Should get it if the service provides it and if it induces no + # substantial additional processing cost + last_update=..., + ) + + def commit_page(self, page: MyPageType) -> None: + # Update the lister state to the latest `current` + current = page[-1]["current"] + + if current > self.state.current: + self.state.current = current + + def finalize(self) -> None: + # Pull fresh lister state from the scheduler backend, in case multiple + # listers run concurrently + scheduler_state = self.get_state_from_scheduler() + + # Update the lister state in the backend only if `current` is fresher than + # the one stored in the database. + if self.state.current > scheduler_state.current: + self.updated = True