Changeset View
Standalone View
swh/lister/npm/lister.py
# Copyright (C) 2018-2019 the Software Heritage developers | # Copyright (C) 2018-2021 the Software Heritage developers | |||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||||||||||||
from typing import Any, Dict, List, Optional | from dataclasses import asdict, dataclass | |||||||||||||||||||||
import logging | ||||||||||||||||||||||
from typing import Any, Dict, Iterator, List, Optional | ||||||||||||||||||||||
from requests import Response | import iso8601 | |||||||||||||||||||||
import requests | ||||||||||||||||||||||
from tenacity.before_sleep import before_sleep_log | ||||||||||||||||||||||
from swh.core import config | from swh.lister import USER_AGENT | |||||||||||||||||||||
from swh.lister.core.indexing_lister import IndexingHttpLister | from swh.lister.pattern import CredentialsType, Lister | |||||||||||||||||||||
from swh.lister.npm.models import NpmModel | from swh.lister.utils import throttling_retry | |||||||||||||||||||||
from swh.scheduler.utils import create_task_dict | from swh.scheduler.interface import SchedulerInterface | |||||||||||||||||||||
from swh.scheduler.model import ListedOrigin | ||||||||||||||||||||||
DEFAULT_CONFIG = { | logger = logging.getLogger(__name__) | |||||||||||||||||||||
"loading_task_policy": "recurring", | ||||||||||||||||||||||
} | ||||||||||||||||||||||
class NpmListerBase(IndexingHttpLister): | @dataclass | |||||||||||||||||||||
"""List packages available in the npm registry in a paginated way | class NpmListerState: | |||||||||||||||||||||
"""State of npm lister""" | ||||||||||||||||||||||
""" | last_seq: Optional[int] = None | |||||||||||||||||||||
MODEL = NpmModel | ||||||||||||||||||||||
LISTER_NAME = "npm" | ||||||||||||||||||||||
instance = "npm" | ||||||||||||||||||||||
def __init__( | ||||||||||||||||||||||
self, url="https://replicate.npmjs.com", per_page=1000, override_config=None | ||||||||||||||||||||||
): | ||||||||||||||||||||||
super().__init__(url=url, override_config=override_config) | ||||||||||||||||||||||
self.config = config.merge_configs(DEFAULT_CONFIG, self.config) | ||||||||||||||||||||||
self.per_page = per_page + 1 | ||||||||||||||||||||||
self.PATH_TEMPLATE += "&limit=%s" % self.per_page | ||||||||||||||||||||||
def get_model_from_repo(self, repo_name: str) -> Dict[str, str]: | ||||||||||||||||||||||
"""(Override) Transform from npm package name to model | ||||||||||||||||||||||
class NpmLister(Lister[NpmListerState, List[Dict[str, Any]]]): | ||||||||||||||||||||||
""" | """ | |||||||||||||||||||||
package_url = "https://www.npmjs.com/package/%s" % repo_name | List all packages hosted on the npm registry. | |||||||||||||||||||||
return { | ||||||||||||||||||||||
"uid": repo_name, | ||||||||||||||||||||||
"indexable": repo_name, | ||||||||||||||||||||||
"name": repo_name, | ||||||||||||||||||||||
"full_name": repo_name, | ||||||||||||||||||||||
"html_url": package_url, | ||||||||||||||||||||||
"origin_url": package_url, | ||||||||||||||||||||||
"origin_type": "npm", | ||||||||||||||||||||||
} | ||||||||||||||||||||||
def task_dict(self, origin_type: str, origin_url: str, **kwargs): | ||||||||||||||||||||||
"""(Override) Return task dict for loading a npm package into the | ||||||||||||||||||||||
archive. | ||||||||||||||||||||||
This is overridden from the lister_base as more information is | The lister is based on the npm replication API powered by a | |||||||||||||||||||||
needed for the ingestion task creation. | CouchDB database (https://docs.couchdb.org/en/stable/api/database/). | |||||||||||||||||||||
""" | Args: | |||||||||||||||||||||
task_type = "load-%s" % origin_type | scheduler: a scheduler instance | |||||||||||||||||||||
task_policy = self.config["loading_task_policy"] | page_size: number of packages info to return per page when querying npm API | |||||||||||||||||||||
return create_task_dict(task_type, task_policy, url=origin_url) | incremental: defines if incremental listing should be used, in that case | |||||||||||||||||||||
only modified or new packages since last incremental listing operation | ||||||||||||||||||||||
def request_headers(self) -> Dict[str, Any]: | will be returned, otherwise all packages will be listed in lexicographical | |||||||||||||||||||||
"""(Override) Set requests headers to send when querying the npm | order | |||||||||||||||||||||
registry. | ||||||||||||||||||||||
""" | """ | |||||||||||||||||||||
headers = super().request_headers() | ||||||||||||||||||||||
headers["Accept"] = "application/json" | ||||||||||||||||||||||
return headers | ||||||||||||||||||||||
def string_pattern_check(self, inner: int, lower: int, upper: int = None): | ||||||||||||||||||||||
""" (Override) Inhibit the effect of that method as packages indices | ||||||||||||||||||||||
correspond to package names and thus do not respect any kind | ||||||||||||||||||||||
of fixed length string pattern | ||||||||||||||||||||||
""" | ||||||||||||||||||||||
pass | ||||||||||||||||||||||
class NpmLister(NpmListerBase): | ||||||||||||||||||||||
"""List all packages available in the npm registry in a paginated way | ||||||||||||||||||||||
""" | ||||||||||||||||||||||
PATH_TEMPLATE = '/_all_docs?startkey="%s"' | ||||||||||||||||||||||
def get_next_target_from_response(self, response: Response) -> Optional[str]: | ||||||||||||||||||||||
"""(Override) Get next npm package name to continue the listing | ||||||||||||||||||||||
""" | LISTER_NAME = "npm" | |||||||||||||||||||||
repos = response.json()["rows"] | INSTANCE = "npm" | |||||||||||||||||||||
return repos[-1]["id"] if len(repos) == self.per_page else None | ||||||||||||||||||||||
def transport_response_simplified(self, response: Response) -> List[Dict[str, str]]: | ||||||||||||||||||||||
"""(Override) Transform npm registry response to list for model manipulation | ||||||||||||||||||||||
""" | API_BASE_URL = "https://replicate.npmjs.com" | |||||||||||||||||||||
repos = response.json()["rows"] | API_INCREMENTAL_LISTING_URL = f"{API_BASE_URL}/_changes" | |||||||||||||||||||||
if len(repos) == self.per_page: | API_FULL_LISTING_URL = f"{API_BASE_URL}/_all_docs" | |||||||||||||||||||||
repos = repos[:-1] | PACKAGE_URL_TEMPLATE = "https://www.npmjs.com/package/{package_name}" | |||||||||||||||||||||
return [self.get_model_from_repo(repo["id"]) for repo in repos] | ||||||||||||||||||||||
def __init__( | ||||||||||||||||||||||
self, | ||||||||||||||||||||||
scheduler: SchedulerInterface, | ||||||||||||||||||||||
page_size: int = 1000, | ||||||||||||||||||||||
incremental: bool = False, | ||||||||||||||||||||||
credentials: CredentialsType = None, | ||||||||||||||||||||||
): | ||||||||||||||||||||||
super().__init__( | ||||||||||||||||||||||
scheduler=scheduler, | ||||||||||||||||||||||
credentials=credentials, | ||||||||||||||||||||||
url=self.API_INCREMENTAL_LISTING_URL | ||||||||||||||||||||||
if incremental | ||||||||||||||||||||||
else self.API_FULL_LISTING_URL, | ||||||||||||||||||||||
instance=self.INSTANCE, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
self.page_size = page_size | ||||||||||||||||||||||
if not incremental: | ||||||||||||||||||||||
douardda: please explain this incremented page size | ||||||||||||||||||||||
Done Inline Actionsack anlambert: ack | ||||||||||||||||||||||
# in full listing mode, first package in each page corresponds to the one | ||||||||||||||||||||||
# provided as the startkey query parameter value, so we increment the page | ||||||||||||||||||||||
# size by one to avoid double package processing | ||||||||||||||||||||||
self.page_size += 1 | ||||||||||||||||||||||
self.incremental = incremental | ||||||||||||||||||||||
self.session = requests.Session() | ||||||||||||||||||||||
self.session.headers.update( | ||||||||||||||||||||||
{"Accept": "application/json", "User-Agent": USER_AGENT} | ||||||||||||||||||||||
) | ||||||||||||||||||||||
def state_from_dict(self, d: Dict[str, Any]) -> NpmListerState: | ||||||||||||||||||||||
return NpmListerState(**d) | ||||||||||||||||||||||
def state_to_dict(self, state: NpmListerState) -> Dict[str, Any]: | ||||||||||||||||||||||
Not Done Inline ActionsI guess it works also, but I'd prefer read {[...], "include_docs": True} Also a line of comment to tell why this flag is set would be helpful IMHO douardda: I guess it works also, but I'd prefer read `{[...], "include_docs": True}`
Also a line of… | ||||||||||||||||||||||
Done Inline ActionsUnfortunately, using True does not work here as the npm API expects the include_docs parameter value in lowercase only. anlambert: Unfortunately, using `True` does not work here as the npm API expects the `include_docs`… | ||||||||||||||||||||||
return asdict(state) | ||||||||||||||||||||||
def request_params(self, last_package_id: str) -> Dict[str, Any]: | ||||||||||||||||||||||
# include package JSON document to get its last update date | ||||||||||||||||||||||
params = {"limit": self.page_size, "include_docs": "true"} | ||||||||||||||||||||||
if self.incremental: | ||||||||||||||||||||||
params["since"] = last_package_id | ||||||||||||||||||||||
else: | ||||||||||||||||||||||
params["startkey"] = last_package_id | ||||||||||||||||||||||
return params | ||||||||||||||||||||||
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | ||||||||||||||||||||||
def page_request(self, last_package_id: str) -> requests.Response: | ||||||||||||||||||||||
params = self.request_params(last_package_id) | ||||||||||||||||||||||
logger.debug("Fetching URL %s with params %s", self.url, params) | ||||||||||||||||||||||
response = self.session.get(self.url, params=params) | ||||||||||||||||||||||
if response.status_code != 200: | ||||||||||||||||||||||
logger.warning( | ||||||||||||||||||||||
"Unexpected HTTP status code %s on %s: %s", | ||||||||||||||||||||||
response.status_code, | ||||||||||||||||||||||
response.url, | ||||||||||||||||||||||
response.content, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
Not Done Inline Actionswhy a Union[int, str]? why not just an int (or Optional[int] if needed)? douardda: why a `Union[int, str]`? why not just an `int` (or `Optional[int]` if needed)? | ||||||||||||||||||||||
Done Inline ActionsI guess it was the end of my dev day and just wanted to make mypy happy. Indeed, a simple str type is enough here. In incremental mode, the last_package_id is an integer while in non incremental mode last_package_id corresponds to the name of the last listed package. Passing integer query parameter as string works so the union can go away. anlambert: I guess it was the end of my dev day and just wanted to make mypy happy.
Indeed, a simple… | ||||||||||||||||||||||
response.raise_for_status() | ||||||||||||||||||||||
return response | ||||||||||||||||||||||
def get_pages(self) -> Iterator[List[Dict[str, Any]]]: | ||||||||||||||||||||||
last_package_id: str = "0" if self.incremental else '""' | ||||||||||||||||||||||
if ( | ||||||||||||||||||||||
self.incremental | ||||||||||||||||||||||
and self.state is not None | ||||||||||||||||||||||
and self.state.last_seq is not None | ||||||||||||||||||||||
): | ||||||||||||||||||||||
last_package_id = str(self.state.last_seq) | ||||||||||||||||||||||
Not Done Inline Actionsno need for the response var here. Also body is a bit misleading (it usually refers to a the raw body of the response). I'd prefer a simple data = self.page_request(last_package_id).json() here douardda: no need for the `response` var here. Also `body` is a bit misleading (it usually refers to a… | ||||||||||||||||||||||
Done Inline ActionsBetter naming indeed anlambert: Better naming indeed | ||||||||||||||||||||||
class NpmIncrementalLister(NpmListerBase): | while True: | |||||||||||||||||||||
"""List packages in the npm registry, updated since a specific | ||||||||||||||||||||||
update_seq value of the underlying CouchDB database, in a paginated way. | ||||||||||||||||||||||
""" | response = self.page_request(last_package_id) | |||||||||||||||||||||
PATH_TEMPLATE = "/_changes?since=%s" | data = response.json() | |||||||||||||||||||||
page = data["results"] if self.incremental else data["rows"] | ||||||||||||||||||||||
@property | if not page: | |||||||||||||||||||||
def CONFIG_BASE_FILENAME(self): # noqa: N802 | break | |||||||||||||||||||||
return "lister_npm_incremental" | ||||||||||||||||||||||
def get_next_target_from_response(self, response: Response) -> Optional[str]: | if self.incremental or len(page) < self.page_size: | |||||||||||||||||||||
"""(Override) Get next npm package name to continue the listing. | yield page | |||||||||||||||||||||
else: | ||||||||||||||||||||||
yield page[:-1] | ||||||||||||||||||||||
Not Done Inline Actionssee above about the type of last_package_id, but this f-string looks weird here. Why not just keep an int everywhere? douardda: see above about the type of `last_package_id`, but this f-string looks weird here. Why not just… | ||||||||||||||||||||||
Done Inline Actionssee my comment above, in full listing mode last_package_id is a package name and it must be double quoted. anlambert: see my comment above, in full listing mode `last_package_id` is a package name and it must be… | ||||||||||||||||||||||
""" | if len(page) < self.page_size: | |||||||||||||||||||||
repos = response.json()["results"] | break | |||||||||||||||||||||
return repos[-1]["seq"] if len(repos) == self.per_page else None | ||||||||||||||||||||||
def transport_response_simplified(self, response: Response) -> List[Dict[str, str]]: | last_package_id = ( | |||||||||||||||||||||
"""(Override) Transform npm registry response to list for model | str(page[-1]["seq"]) if self.incremental else f'"{page[-1]["id"]}"' | |||||||||||||||||||||
Not Done Inline Actions
What about this? vlorentz: What about this? | ||||||||||||||||||||||
Done Inline ActionsThis could raise an error in full listing mode when processing the last page as seq field is missing in that case. anlambert: This could raise an error in full listing mode when processing the last page as `seq` field is… | ||||||||||||||||||||||
manipulation. | ) | |||||||||||||||||||||
""" | def get_origins_from_page( | |||||||||||||||||||||
repos = response.json()["results"] | self, page: List[Dict[str, Any]] | |||||||||||||||||||||
if len(repos) == self.per_page: | ) -> Iterator[ListedOrigin]: | |||||||||||||||||||||
repos = repos[:-1] | """Convert a page of Npm repositories into a list of ListedOrigin.""" | |||||||||||||||||||||
return [self.get_model_from_repo(repo["id"]) for repo in repos] | assert self.lister_obj.id is not None | |||||||||||||||||||||
def filter_before_inject(self, models_list: List[Dict[str, Any]]): | ||||||||||||||||||||||
"""(Override) Filter out documents in the CouchDB database | ||||||||||||||||||||||
not related to a npm package. | ||||||||||||||||||||||
""" | for package in page: | |||||||||||||||||||||
models_filtered = [] | # no source code to archive here | |||||||||||||||||||||
for model in models_list: | if not package["doc"].get("versions", {}): | |||||||||||||||||||||
package_name = model["name"] | ||||||||||||||||||||||
# document related to CouchDB internals | ||||||||||||||||||||||
if package_name.startswith("_design/"): | ||||||||||||||||||||||
continue | continue | |||||||||||||||||||||
models_filtered.append(model) | ||||||||||||||||||||||
return models_filtered | ||||||||||||||||||||||
def disable_deleted_repo_tasks(self, start, end, keep_these): | package_name = package["doc"]["name"] | |||||||||||||||||||||
"""(Override) Disable the processing performed by that method as it is | package_latest_version = ( | |||||||||||||||||||||
not relevant in this incremental lister context. It also raises an | package["doc"].get("dist-tags", {}).get("latest", "") | |||||||||||||||||||||
exception due to a different index type (int instead of str). | ) | |||||||||||||||||||||
""" | last_update = None | |||||||||||||||||||||
pass | if package_latest_version in package["doc"].get("time", {}): | |||||||||||||||||||||
last_update = iso8601.parse_date( | ||||||||||||||||||||||
package["doc"]["time"][package_latest_version] | ||||||||||||||||||||||
) | ||||||||||||||||||||||
yield ListedOrigin( | ||||||||||||||||||||||
lister_id=self.lister_obj.id, | ||||||||||||||||||||||
url=self.PACKAGE_URL_TEMPLATE.format(package_name=package_name), | ||||||||||||||||||||||
visit_type="npm", | ||||||||||||||||||||||
last_update=last_update, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
def commit_page(self, page: List[Dict[str, Any]]): | ||||||||||||||||||||||
"""Update the currently stored state using the latest listed page.""" | ||||||||||||||||||||||
if self.incremental: | ||||||||||||||||||||||
last_package = page[-1] | ||||||||||||||||||||||
last_seq = last_package["seq"] | ||||||||||||||||||||||
if self.state.last_seq is None or last_seq > self.state.last_seq: | ||||||||||||||||||||||
self.state.last_seq = last_seq | ||||||||||||||||||||||
def finalize(self): | ||||||||||||||||||||||
if self.incremental and self.state.last_seq is not None: | ||||||||||||||||||||||
Not Done Inline Actionswhy not swap these 2 lines? call self.get_state_from_scheduler() only if self.state.last_seq is not None douardda: why not swap these 2 lines? call `self.get_state_from_scheduler()` only if `self.state. | ||||||||||||||||||||||
Done Inline ActionsWell seen, I will rather merge the inverse condition in the first if as it is equivalent. anlambert: Well seen, I will rather merge the inverse condition in the first if as it is equivalent. | ||||||||||||||||||||||
scheduler_state = self.get_state_from_scheduler() | ||||||||||||||||||||||
if ( | ||||||||||||||||||||||
scheduler_state.last_seq is None | ||||||||||||||||||||||
or self.state.last_seq > scheduler_state.last_seq | ||||||||||||||||||||||
): | ||||||||||||||||||||||
self.updated = True |
please explain this incremented page size