Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/npm/lister.py
# Copyright (C) 2018-2019 the Software Heritage developers | # Copyright (C) 2018-2019 the Software Heritage developers | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from swh.lister.core.indexing_lister import IndexingHttpLister | from swh.lister.core.indexing_lister import IndexingHttpLister | ||||
from swh.lister.npm.models import NpmModel | from swh.lister.npm.models import NpmModel | ||||
from swh.scheduler.utils import create_task_dict | from swh.scheduler.utils import create_task_dict | ||||
from typing import Any, Dict, Optional, List | |||||
from requests import Response | |||||
class NpmListerBase(IndexingHttpLister): | class NpmListerBase(IndexingHttpLister): | ||||
"""List packages available in the npm registry in a paginated way | """List packages available in the npm registry in a paginated way | ||||
""" | """ | ||||
MODEL = NpmModel | MODEL = NpmModel | ||||
LISTER_NAME = 'npm' | LISTER_NAME = 'npm' | ||||
instance = 'npm' | instance = 'npm' | ||||
def __init__(self, url='https://replicate.npmjs.com', | def __init__(self, url='https://replicate.npmjs.com', | ||||
per_page=1000, override_config=None): | per_page=1000, override_config=None): | ||||
super().__init__(url=url, override_config=override_config) | super().__init__(url=url, override_config=override_config) | ||||
self.per_page = per_page + 1 | self.per_page = per_page + 1 | ||||
self.PATH_TEMPLATE += '&limit=%s' % self.per_page | self.PATH_TEMPLATE += '&limit=%s' % self.per_page | ||||
@property | @property | ||||
def ADDITIONAL_CONFIG(self): | def ADDITIONAL_CONFIG(self) -> Dict[str, Any]: | ||||
vlorentz: You can be more specific than `Any`. | |||||
"""(Override) Add extra configuration | """(Override) Add extra configuration | ||||
""" | """ | ||||
default_config = super().ADDITIONAL_CONFIG | default_config = super().ADDITIONAL_CONFIG | ||||
default_config['loading_task_policy'] = ('str', 'recurring') | default_config['loading_task_policy'] = ('str', 'recurring') | ||||
return default_config | return default_config | ||||
def get_model_from_repo(self, repo_name): | def get_model_from_repo(self, repo_name: str) -> Dict[str, str]: | ||||
"""(Override) Transform from npm package name to model | """(Override) Transform from npm package name to model | ||||
""" | """ | ||||
package_url = 'https://www.npmjs.com/package/%s' % repo_name | package_url = 'https://www.npmjs.com/package/%s' % repo_name | ||||
return { | return { | ||||
'uid': repo_name, | 'uid': repo_name, | ||||
'indexable': repo_name, | 'indexable': repo_name, | ||||
'name': repo_name, | 'name': repo_name, | ||||
'full_name': repo_name, | 'full_name': repo_name, | ||||
'html_url': package_url, | 'html_url': package_url, | ||||
'origin_url': package_url, | 'origin_url': package_url, | ||||
'origin_type': 'npm', | 'origin_type': 'npm', | ||||
} | } | ||||
def task_dict(self, origin_type, origin_url, **kwargs): | def task_dict(self, origin_type: str, origin_url: str, **kwargs): | ||||
"""(Override) Return task dict for loading a npm package into the | """(Override) Return task dict for loading a npm package into the | ||||
archive. | archive. | ||||
This is overridden from the lister_base as more information is | This is overridden from the lister_base as more information is | ||||
needed for the ingestion task creation. | needed for the ingestion task creation. | ||||
""" | """ | ||||
task_type = 'load-%s' % origin_type | task_type = 'load-%s' % origin_type | ||||
task_policy = self.config['loading_task_policy'] | task_policy = self.config['loading_task_policy'] | ||||
return create_task_dict(task_type, task_policy, | return create_task_dict(task_type, task_policy, | ||||
url=origin_url) | url=origin_url) | ||||
def request_headers(self): | def request_headers(self) -> Dict[str, Any]: | ||||
Not Done Inline ActionsYou can probably be more specific than Any vlorentz: You can probably be more specific than `Any` | |||||
"""(Override) Set requests headers to send when querying the npm | """(Override) Set requests headers to send when querying the npm | ||||
registry. | registry. | ||||
""" | """ | ||||
headers = super().request_headers() | headers = super().request_headers() | ||||
headers['Accept'] = 'application/json' | headers['Accept'] = 'application/json' | ||||
return headers | return headers | ||||
def string_pattern_check(self, inner, lower, upper=None): | def string_pattern_check(self, inner: int, lower: int, upper: int = None): | ||||
""" (Override) Inhibit the effect of that method as packages indices | """ (Override) Inhibit the effect of that method as packages indices | ||||
correspond to package names and thus do not respect any kind | correspond to package names and thus do not respect any kind | ||||
of fixed length string pattern | of fixed length string pattern | ||||
""" | """ | ||||
pass | pass | ||||
class NpmLister(NpmListerBase): | class NpmLister(NpmListerBase): | ||||
"""List all packages available in the npm registry in a paginated way | """List all packages available in the npm registry in a paginated way | ||||
""" | """ | ||||
PATH_TEMPLATE = '/_all_docs?startkey="%s"' | PATH_TEMPLATE = '/_all_docs?startkey="%s"' | ||||
def get_next_target_from_response(self, response): | def get_next_target_from_response( | ||||
self, response: Response) -> Optional[str]: | |||||
"""(Override) Get next npm package name to continue the listing | """(Override) Get next npm package name to continue the listing | ||||
""" | """ | ||||
repos = response.json()['rows'] | repos = response.json()['rows'] | ||||
return repos[-1]['id'] if len(repos) == self.per_page else None | return repos[-1]['id'] if len(repos) == self.per_page else None | ||||
def transport_response_simplified(self, response): | def transport_response_simplified( | ||||
self, response: Response) -> List[Dict[str, str]]: | |||||
"""(Override) Transform npm registry response to list for model manipulation | """(Override) Transform npm registry response to list for model manipulation | ||||
""" | """ | ||||
repos = response.json()['rows'] | repos = response.json()['rows'] | ||||
if len(repos) == self.per_page: | if len(repos) == self.per_page: | ||||
repos = repos[:-1] | repos = repos[:-1] | ||||
return [self.get_model_from_repo(repo['id']) for repo in repos] | return [self.get_model_from_repo(repo['id']) for repo in repos] | ||||
class NpmIncrementalLister(NpmListerBase): | class NpmIncrementalLister(NpmListerBase): | ||||
"""List packages in the npm registry, updated since a specific | """List packages in the npm registry, updated since a specific | ||||
update_seq value of the underlying CouchDB database, in a paginated way. | update_seq value of the underlying CouchDB database, in a paginated way. | ||||
""" | """ | ||||
PATH_TEMPLATE = '/_changes?since=%s' | PATH_TEMPLATE = '/_changes?since=%s' | ||||
@property | @property | ||||
def CONFIG_BASE_FILENAME(self): # noqa: N802 | def CONFIG_BASE_FILENAME(self): # noqa: N802 | ||||
return 'lister_npm_incremental' | return 'lister_npm_incremental' | ||||
def get_next_target_from_response(self, response): | def get_next_target_from_response( | ||||
self, response: Response) -> Optional[str]: | |||||
"""(Override) Get next npm package name to continue the listing. | """(Override) Get next npm package name to continue the listing. | ||||
""" | """ | ||||
repos = response.json()['results'] | repos = response.json()['results'] | ||||
return repos[-1]['seq'] if len(repos) == self.per_page else None | return repos[-1]['seq'] if len(repos) == self.per_page else None | ||||
def transport_response_simplified(self, response): | def transport_response_simplified( | ||||
self, response: Response) -> List[Dict[str, str]]: | |||||
"""(Override) Transform npm registry response to list for model | """(Override) Transform npm registry response to list for model | ||||
manipulation. | manipulation. | ||||
""" | """ | ||||
repos = response.json()['results'] | repos = response.json()['results'] | ||||
if len(repos) == self.per_page: | if len(repos) == self.per_page: | ||||
repos = repos[:-1] | repos = repos[:-1] | ||||
return [self.get_model_from_repo(repo['id']) for repo in repos] | return [self.get_model_from_repo(repo['id']) for repo in repos] | ||||
def filter_before_inject(self, models_list): | def filter_before_inject(self, models_list: List[Dict[str, Any]]): | ||||
Not Done Inline Actionsmissing return type vlorentz: missing return type | |||||
"""(Override) Filter out documents in the CouchDB database | """(Override) Filter out documents in the CouchDB database | ||||
not related to a npm package. | not related to a npm package. | ||||
""" | """ | ||||
models_filtered = [] | models_filtered = [] | ||||
for model in models_list: | for model in models_list: | ||||
package_name = model['name'] | package_name = model['name'] | ||||
# document related to CouchDB internals | # document related to CouchDB internals | ||||
Show All 12 Lines |
You can be more specific than Any.