Page MenuHomeSoftware Heritage

D4927.diff
No OneTemporary

D4927.diff

diff --git a/docs/new_lister_template.py b/docs/new_lister_template.py
new file mode 100644
--- /dev/null
+++ b/docs/new_lister_template.py
@@ -0,0 +1,166 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from dataclasses import asdict, dataclass
+import logging
+from typing import Any, Dict, Iterator, List
+from urllib.parse import urljoin
+
+import requests
+from tenacity.before_sleep import before_sleep_log
+
+from swh.lister.utils import throttling_retry
+from swh.scheduler.interface import SchedulerInterface
+from swh.scheduler.model import ListedOrigin
+
+from .. import USER_AGENT
+from ..pattern import CredentialsType, Lister
+
+logger = logging.getLogger(__name__)
+
+# Type alias of a page as returned by `get_pages`. Optional, but good for readability.
+MyPageType = List[Dict[str, Any]]
+
+
+@dataclass
+class MyListerState:
+ """State of My lister"""
+
+ # Holds the state that need to be kept to do incremental listing
+
+ current: str = ""
+ """Id of the last origin listed on an incremental pass"""
+
+
+# If there is no need to keep state, subclass StatelessLister[MyPageType]
+class MyLister(Lister[MyListerState, MyPageType]):
+ """List origins from My software source.
+
+ """
+
+ # Part of the lister API, that identifies this lister
+ LISTER_NAME = ""
+ # Not required, CVS type of the origins listed by this lister, if constant
+ VISIT_TYPE = ""
+
+ # Instance URLs include the hostname and the common path prefix of processed URLs
+ EXAMPLE_BASE_URL = "https://netloc/api"
+ # Path of a specific resource to process, to join the base URL with
+ EXAMPLE_PATH = "origins/list"
+
+ def __init__(
+ self,
+ # Required
+ scheduler: SchedulerInterface,
+ # Instance URL, required for multi-instance listers
+ url: str,
+ # Instance name (free form) required for multi-instance listers,
+ # or computed from `url`
+ instance: str,
+ # Must declare this argument if the lister supports authentication
+ credentials: CredentialsType = None,
+ ):
+ super().__init__(
+ scheduler=scheduler, credentials=credentials, url=url, instance=instance,
+ )
+
+ # not required to set up a session, but must use the user-agent
+ self.session = requests.Session()
+ self.session.headers.update(
+ {"Accept": "application/json", "User-Agent": USER_AGENT}
+ )
+
+ def state_from_dict(self, d: Dict[str, Any]) -> MyListerState:
+ return MyListerState(**d)
+
+ def state_to_dict(self, state: MyListerState) -> Dict[str, Any]:
+ return asdict(state)
+
+ @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING))
+ def page_request(self, url, params) -> requests.Response:
+ # Do the network resource request under a retrying decorator
+ # to handle rate limiting and transient errors up to a limit.
+ # `throttling_retry` by default use the `requests` library to check
+ # only for rate-limit and a base-10 exponential waiting strategy.
+ # This can be customized by passed waiting, retrying and logging strategies
+ # as functions. See the `tenacity` library documentation.
+
+ # Log URL to fetch to ease debugging
+ logger.debug("Fetching URL %s with params %s", url, params)
+ response = self.session.get(url, params=params)
+
+ if response.status_code != 200:
+ # Log response content to ease debugging
+ logger.warning(
+ "Unexpected HTTP status code %s on %s: %s",
+ response.status_code,
+ response.url,
+ response.content,
+ )
+ # The lister must fail on blocking errors
+ response.raise_for_status()
+
+ return response
+
+ def get_pages(self) -> Iterator[MyPageType]:
+ # The algorithm depends on the service, but should request data reliably,
+ # following pagination if relevant and yielding pages in a streaming fashion.
+ # If incremental listing is supported, initialize from saved lister state.
+ # Make use of any next page URL provided.
+ # Simplify the results early to ease testing and debugging.
+
+ # Initialize from the lister saved state
+ current = ""
+ if self.state.current is not None:
+ current = self.state.current
+
+ # Construct the URL of a service endpoint, the lister can have others to fetch
+ url = urljoin(self.url, self.EXAMPLE_PATH)
+
+ while current is not None:
+ # Parametrize the request for incremental listing
+ body = self.page_request(url, {"current": current}).json()
+
+ # Simplify the page if possible to only the necessary elements
+ # and yield it
+ yield body
+
+ # Get the next page parameter or end the loop when there is none
+ current = body.get("next")
+
+ def get_origins_from_page(self, page: MyPageType) -> Iterator[ListedOrigin]:
+ """Convert a page of My repositories into a list of ListedOrigins"""
+ assert self.lister_obj.id is not None
+
+ for element in page:
+
+ yield ListedOrigin(
+ # Required. Should use this value.
+ lister_id=self.lister_obj.id,
+ # Visit type of the currently processed origin
+ visit_type=self.VISIT_TYPE,
+ # Required. URL corresponding best to the origin.
+ url=...,
+ # Should get it if the service provides it and if it induces no
+ # substantial additional processing cost
+ last_update=...,
+ )
+
+ def commit_page(self, page: MyPageType) -> None:
+ # Update the lister state to the latest `current`
+ current = page[-1]["current"]
+
+ if current > self.state.current:
+ self.state.current = current
+
+ def finalize(self) -> None:
+ # Pull fresh lister state from the scheduler backend, in case multiple
+ # listers run concurrently
+ scheduler_state = self.get_state_from_scheduler()
+
+ # Update the lister state in the backend only if `current` is fresher than
+ # the one stored in the database.
+ if self.state.current > scheduler_state.current:
+ self.updated = True

File Metadata

Mime Type
text/plain
Expires
Wed, Dec 18, 10:38 PM (22 h, 42 m ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224015

Event Timeline