Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/cpan/lister.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | |||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | |||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||
from collections import defaultdict | ||||||||||||
from datetime import datetime | ||||||||||||
import logging | import logging | |||||||||||
from typing import Any, Dict, Iterator, List, Optional | from typing import Any, Dict, Iterator, List, Optional, Set | |||||||||||
import iso8601 | ||||||||||||
from swh.scheduler.interface import SchedulerInterface | from swh.scheduler.interface import SchedulerInterface | |||||||||||
from swh.scheduler.model import ListedOrigin | from swh.scheduler.model import ListedOrigin | |||||||||||
from ..pattern import CredentialsType, StatelessLister | from ..pattern import CredentialsType, StatelessLister | |||||||||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | |||||||||||
# Aliasing the page results returned by `get_pages` method from the lister. | # Aliasing the page results returned by `get_pages` method from the lister. | |||||||||||
CpanListerPage = List[Dict[str, Any]] | CpanListerPage = Set[str] | |||||||||||
def get_field_value(entry, field_name): | ||||||||||||
""" | ||||||||||||
Splits ``field_name`` on ``.``, and use it as path in the nested ``entry`` | ||||||||||||
dictionary. If a value does not exist, returns None. | ||||||||||||
>>> entry = {"_source": {"foo": 1, "bar": {"baz": 2, "qux": [3]}}} | ||||||||||||
>>> get_field_value(entry, "foo") | ||||||||||||
1 | ||||||||||||
>>> get_field_value(entry, "bar") | ||||||||||||
{'baz': 2, 'qux': [3]} | ||||||||||||
>>> get_field_value(entry, "bar.baz") | ||||||||||||
2 | ||||||||||||
>>> get_field_value(entry, "bar.qux") | ||||||||||||
3 | ||||||||||||
""" | ||||||||||||
fields = field_name.split(".") | ||||||||||||
field_value = entry["_source"] | ||||||||||||
for field in fields[:-1]: | ||||||||||||
field_value = field_value.get(field, {}) | ||||||||||||
field_value = field_value.get(fields[-1]) | ||||||||||||
# scrolled results might have field value in a list | ||||||||||||
if isinstance(field_value, list): | ||||||||||||
field_value = field_value[0] | ||||||||||||
return field_value | ||||||||||||
class CpanLister(StatelessLister[CpanListerPage]): | class CpanLister(StatelessLister[CpanListerPage]): | |||||||||||
"""The Cpan lister list origins from 'Cpan', the Comprehensive Perl Archive | """The Cpan lister list origins from 'Cpan', the Comprehensive Perl Archive | |||||||||||
Network.""" | Network.""" | |||||||||||
LISTER_NAME = "cpan" | LISTER_NAME = "cpan" | |||||||||||
VISIT_TYPE = "cpan" | VISIT_TYPE = "cpan" | |||||||||||
INSTANCE = "cpan" | INSTANCE = "cpan" | |||||||||||
BASE_URL = "https://fastapi.metacpan.org/v1/" | API_BASE_URL = "https://fastapi.metacpan.org/v1" | |||||||||||
REQUIRED_DOC_FIELDS = [ | ||||||||||||
"download_url", | ||||||||||||
"checksum_sha256", | ||||||||||||
"distribution", | ||||||||||||
"version", | ||||||||||||
] | ||||||||||||
OPTIONAL_DOC_FIELDS = ["date", "author", "stat.size", "name", "metadata.author"] | ||||||||||||
ORIGIN_URL_PATTERN = "https://metacpan.org/dist/{module_name}" | ||||||||||||
def __init__( | def __init__( | |||||||||||
self, | self, | |||||||||||
scheduler: SchedulerInterface, | scheduler: SchedulerInterface, | |||||||||||
credentials: Optional[CredentialsType] = None, | credentials: Optional[CredentialsType] = None, | |||||||||||
): | ): | |||||||||||
super().__init__( | super().__init__( | |||||||||||
scheduler=scheduler, | scheduler=scheduler, | |||||||||||
credentials=credentials, | credentials=credentials, | |||||||||||
instance=self.INSTANCE, | instance=self.INSTANCE, | |||||||||||
url=self.BASE_URL, | url=self.API_BASE_URL, | |||||||||||
) | ||||||||||||
self.artifacts: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | ||||||||||||
self.module_metadata: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | ||||||||||||
self.release_dates: Dict[str, List[datetime]] = defaultdict(list) | ||||||||||||
self.module_names: Set[str] = set() | ||||||||||||
vlorentz: and move it outside the class | ||||||||||||
def process_release_page(self, page: List[Dict[str, Any]]): | ||||||||||||
for entry in page: | ||||||||||||
if "_source" not in entry or not all( | ||||||||||||
Not Done Inline Actions
easier to read, IMO vlorentz: easier to read, IMO | ||||||||||||
k in entry["_source"].keys() for k in self.REQUIRED_DOC_FIELDS | ||||||||||||
): | ||||||||||||
Not Done Inline Actions
vlorentz: | ||||||||||||
logger.warning( | ||||||||||||
"Skipping release entry %s as some required fields are missing", | ||||||||||||
entry.get("_source"), | ||||||||||||
) | ||||||||||||
continue | ||||||||||||
module_name = get_field_value(entry, "distribution") | ||||||||||||
module_version = get_field_value(entry, "version") | ||||||||||||
module_download_url = get_field_value(entry, "download_url") | ||||||||||||
module_sha256_checksum = get_field_value(entry, "checksum_sha256") | ||||||||||||
module_date = get_field_value(entry, "date") | ||||||||||||
module_size = get_field_value(entry, "stat.size") | ||||||||||||
module_author = get_field_value(entry, "author") | ||||||||||||
module_author_fullname = get_field_value(entry, "metadata.author") | ||||||||||||
release_name = get_field_value(entry, "name") | ||||||||||||
self.artifacts[module_name].append( | ||||||||||||
{ | ||||||||||||
"url": module_download_url, | ||||||||||||
"filename": module_download_url.split("/")[-1], | ||||||||||||
"checksums": {"sha256": module_sha256_checksum}, | ||||||||||||
"version": module_version, | ||||||||||||
"length": module_size, | ||||||||||||
} | ||||||||||||
) | ) | |||||||||||
self.module_metadata[module_name].append( | ||||||||||||
{ | ||||||||||||
"name": module_name, | ||||||||||||
"version": module_version, | ||||||||||||
"cpan_author": module_author, | ||||||||||||
"author": ( | ||||||||||||
module_author_fullname | ||||||||||||
if module_author_fullname not in (None, "", "unknown") | ||||||||||||
else module_author | ||||||||||||
), | ||||||||||||
"date": module_date, | ||||||||||||
"release_name": release_name, | ||||||||||||
} | ||||||||||||
) | ||||||||||||
self.release_dates[module_name].append(iso8601.parse_date(module_date)) | ||||||||||||
self.module_names.add(module_name) | ||||||||||||
def get_pages(self) -> Iterator[CpanListerPage]: | def get_pages(self) -> Iterator[CpanListerPage]: | |||||||||||
"""Yield an iterator which returns 'page'""" | """Yield an iterator which returns 'page'""" | |||||||||||
endpoint = f"{self.BASE_URL}distribution/_search" | endpoint = f"{self.API_BASE_URL}/release/_search" | |||||||||||
Not Done Inline ActionsI'd rather pass the BASE_URL and let the loader build this URL; it will allow changing loader behavior without changing the lister too. vlorentz: I'd rather pass the `BASE_URL` and let the loader build this URL; it will allow changing loader… | ||||||||||||
Done Inline ActionsOk but this means I have to add the release name in module metadata, not a big deal though. anlambert: Ok but this means I have to add the release name in module metadata, not a big deal though. | ||||||||||||
scrollendpoint = f"{self.BASE_URL}_search/scroll" | scrollendpoint = f"{self.API_BASE_URL}/_search/scroll" | |||||||||||
size: int = 1000 | size = 1000 | |||||||||||
res = self.http_request( | res = self.http_request( | |||||||||||
endpoint, | endpoint, | |||||||||||
params={ | params={ | |||||||||||
"fields": ["name"], | "_source": self.REQUIRED_DOC_FIELDS + self.OPTIONAL_DOC_FIELDS, | |||||||||||
"size": size, | "size": size, | |||||||||||
"scroll": "1m", | "scroll": "1m", | |||||||||||
}, | }, | |||||||||||
) | ) | |||||||||||
data = res.json()["hits"]["hits"] | data = res.json()["hits"]["hits"] | |||||||||||
yield data | self.process_release_page(data) | |||||||||||
_scroll_id = res.json()["_scroll_id"] | _scroll_id = res.json()["_scroll_id"] | |||||||||||
while data: | while data: | |||||||||||
scroll_res = self.http_request( | scroll_res = self.http_request( | |||||||||||
scrollendpoint, params={"scroll": "1m", "scroll_id": _scroll_id} | scrollendpoint, params={"scroll": "1m", "scroll_id": _scroll_id} | |||||||||||
) | ) | |||||||||||
data = scroll_res.json()["hits"]["hits"] | data = scroll_res.json()["hits"]["hits"] | |||||||||||
_scroll_id = scroll_res.json()["_scroll_id"] | _scroll_id = scroll_res.json()["_scroll_id"] | |||||||||||
yield data | self.process_release_page(data) | |||||||||||
yield self.module_names | ||||||||||||
def get_origins_from_page(self, page: CpanListerPage) -> Iterator[ListedOrigin]: | def get_origins_from_page( | |||||||||||
self, module_names: CpanListerPage | ||||||||||||
) -> Iterator[ListedOrigin]: | ||||||||||||
"""Iterate on all pages and yield ListedOrigin instances.""" | """Iterate on all pages and yield ListedOrigin instances.""" | |||||||||||
assert self.lister_obj.id is not None | assert self.lister_obj.id is not None | |||||||||||
for entry in page: | for module_name in module_names: | |||||||||||
# Skip the entry if 'fields' or 'name' keys are missing | ||||||||||||
if "fields" not in entry or "name" not in entry["fields"]: | ||||||||||||
continue | ||||||||||||
pkgname = entry["fields"]["name"] | ||||||||||||
# TODO: Check why sometimes its a one value list | ||||||||||||
if type(pkgname) != str: | ||||||||||||
pkgname = pkgname[0] | ||||||||||||
url = f"https://metacpan.org/dist/{pkgname}" | ||||||||||||
yield ListedOrigin( | yield ListedOrigin( | |||||||||||
lister_id=self.lister_obj.id, | lister_id=self.lister_obj.id, | |||||||||||
visit_type=self.VISIT_TYPE, | visit_type=self.VISIT_TYPE, | |||||||||||
url=url, | url=self.ORIGIN_URL_PATTERN.format(module_name=module_name), | |||||||||||
last_update=None, | last_update=max(self.release_dates[module_name]), | |||||||||||
extra_loader_arguments={ | ||||||||||||
"api_base_url": self.API_BASE_URL, | ||||||||||||
"artifacts": self.artifacts[module_name], | ||||||||||||
"module_metadata": self.module_metadata[module_name], | ||||||||||||
}, | ||||||||||||
) | ) |
and move it outside the class