Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/npm/lister.py
# Copyright (C) 2018-2021 the Software Heritage developers | # Copyright (C) 2018-2022 the Software Heritage developers | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from dataclasses import asdict, dataclass | from dataclasses import asdict, dataclass | ||||
import logging | import logging | ||||
from typing import Any, Dict, Iterator, List, Optional | from typing import Any, Dict, Iterator, List, Optional | ||||
import iso8601 | import iso8601 | ||||
import requests | |||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister import USER_AGENT | |||||
from swh.lister.pattern import CredentialsType, Lister | from swh.lister.pattern import CredentialsType, Lister | ||||
from swh.lister.utils import http_retry | |||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | ||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
@dataclass | @dataclass | ||||
class NpmListerState: | class NpmListerState: | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | ): | ||||
self.page_size = page_size | self.page_size = page_size | ||||
if not incremental: | if not incremental: | ||||
# in full listing mode, first package in each page corresponds to the one | # in full listing mode, first package in each page corresponds to the one | ||||
# provided as the startkey query parameter value, so we increment the page | # provided as the startkey query parameter value, so we increment the page | ||||
# size by one to avoid double package processing | # size by one to avoid double package processing | ||||
self.page_size += 1 | self.page_size += 1 | ||||
self.incremental = incremental | self.incremental = incremental | ||||
self.session = requests.Session() | self.session.headers.update({"Accept": "application/json"}) | ||||
self.session.headers.update( | |||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | |||||
) | |||||
def state_from_dict(self, d: Dict[str, Any]) -> NpmListerState: | def state_from_dict(self, d: Dict[str, Any]) -> NpmListerState: | ||||
return NpmListerState(**d) | return NpmListerState(**d) | ||||
def state_to_dict(self, state: NpmListerState) -> Dict[str, Any]: | def state_to_dict(self, state: NpmListerState) -> Dict[str, Any]: | ||||
return asdict(state) | return asdict(state) | ||||
def request_params(self, last_package_id: str) -> Dict[str, Any]: | def request_params(self, last_package_id: str) -> Dict[str, Any]: | ||||
# include package JSON document to get its last update date | # include package JSON document to get its last update date | ||||
params = {"limit": self.page_size, "include_docs": "true"} | params = {"limit": self.page_size, "include_docs": "true"} | ||||
if self.incremental: | if self.incremental: | ||||
params["since"] = last_package_id | params["since"] = last_package_id | ||||
else: | else: | ||||
params["startkey"] = last_package_id | params["startkey"] = last_package_id | ||||
return params | return params | ||||
@http_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | |||||
def page_request(self, last_package_id: str) -> requests.Response: | |||||
params = self.request_params(last_package_id) | |||||
logger.debug("Fetching URL %s with params %s", self.url, params) | |||||
response = self.session.get(self.url, params=params) | |||||
if response.status_code != 200: | |||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
response.raise_for_status() | |||||
return response | |||||
def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | ||||
last_package_id: str = "0" if self.incremental else '""' | last_package_id: str = "0" if self.incremental else '""' | ||||
if ( | if ( | ||||
self.incremental | self.incremental | ||||
and self.state is not None | and self.state is not None | ||||
and self.state.last_seq is not None | and self.state.last_seq is not None | ||||
): | ): | ||||
last_package_id = str(self.state.last_seq) | last_package_id = str(self.state.last_seq) | ||||
while True: | while True: | ||||
response = self.page_request(last_package_id) | response = self.http_request( | ||||
self.url, params=self.request_params(last_package_id) | |||||
) | |||||
data = response.json() | data = response.json() | ||||
page = data["results"] if self.incremental else data["rows"] | page = data["results"] if self.incremental else data["rows"] | ||||
if not page: | if not page: | ||||
break | break | ||||
if self.incremental or len(page) < self.page_size: | if self.incremental or len(page) < self.page_size: | ||||
▲ Show 20 Lines • Show All 58 Lines • Show Last 20 Lines |