diff --git a/docs/new_lister_template.py b/docs/new_lister_template.py new file mode 100644 --- /dev/null +++ b/docs/new_lister_template.py @@ -0,0 +1,131 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from dataclasses import asdict, dataclass +import logging +from typing import Any, Dict, Iterator, List +from urllib.parse import urljoin + +import requests +from tenacity.before_sleep import before_sleep_log + +from swh.lister.utils import throttling_retry +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.model import ListedOrigin + +from .. import USER_AGENT +from ..pattern import CredentialsType, Lister + +logger = logging.getLogger(__name__) + +MyPageType = List[Dict[str, Any]] + + +@dataclass +class MyListerState: + """State of my lister""" + + current: str = "" + """Id of the last origin listed on an incremental pass""" + + +class MyLister(Lister[MyListerState, MyPageType]): + """List origins from My. + + """ + + LISTER_NAME = "" + VISIT_TYPE = "" + + DEFAULT_INSTANCE = "" + DEFAULT_BASE_URL = "https://netloc/api" + + MAIN_PAGE = "origins/list" + + def __init__( + self, + scheduler: SchedulerInterface, + url: str, + instance: str, + credentials: CredentialsType = None, + ): + super().__init__( + scheduler=scheduler, credentials=credentials, url=url, instance=instance, + ) + + self.session = requests.Session() + self.session.headers.update( + {"Accept": "application/json", "User-Agent": USER_AGENT} + ) + + def state_from_dict(self, d: Dict[str, Any]) -> MyListerState: + return MyListerState(**d) + + def state_to_dict(self, state: MyListerState) -> Dict[str, Any]: + return asdict(state) + + @throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) + def page_request(self, url, params) -> requests.Response: + + logger.debug("Fetching URL %s with params %s", url, params) + response = self.session.get(url, params=params) + + if response.status_code != 200: + logger.warning( + "Unexpected HTTP status code %s on %s: %s", + response.status_code, + response.url, + response.content, + ) + response.raise_for_status() + + return response + + def get_pages(self) -> Iterator[MyPageType]: + + # init from saved state + current = "" + if self.state.current is not None: + current = self.state.current + + url = urljoin(self.url, self.MAIN_PAGE) + + while current is not None: + body = self.page_request(url, {"current": current}).json() + + # simplify page if possible and yield it + yield body + + # next or end + current = body.get("next") + + def get_origins_from_page(self, page: MyPageType) -> Iterator[ListedOrigin]: + """Convert a page of My repositories into a list of ListedOrigins""" + assert self.lister_obj.id is not None + + for element in page: + + yield ListedOrigin( + lister_id=self.lister_obj.id, + visit_type=self.VISIT_TYPE, + url=..., + last_update=..., + ) + + def commit_page(self, page: MyPageType) -> None: + """Update the currently stored state using the latest listed page""" + current = page[-1]["current"] + + if current > self.state.current: + self.state.current = current + + def finalize(self) -> None: + # Pull fresh lister state from the scheduler backend + scheduler_state = self.get_state_from_scheduler() + + # Update the lister state in the backend only if `current` is fresher than + # the one stored in the database. + if self.state.current > scheduler_state.current: + self.updated = True