Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/conda/lister.py
- This file was added.
# Copyright (C) 2022 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
anlambert: Please add an empty line after license header. | |||||
import bz2 | |||||
from collections import defaultdict | |||||
import datetime | |||||
import json | |||||
import logging | |||||
from typing import Any, Dict, Iterator, List, Optional, Tuple | |||||
import iso8601 | |||||
import requests | |||||
from tenacity.before_sleep import before_sleep_log | |||||
from swh.lister.utils import throttling_retry | |||||
from swh.scheduler.interface import SchedulerInterface | |||||
from swh.scheduler.model import ListedOrigin | |||||
from .. import __version__ | |||||
from ..pattern import CredentialsType, StatelessLister | |||||
USER_AGENT = ( | |||||
f"Software Heritage Conda Lister v{__version__} " | |||||
"(+https://www.softwareheritage.org/contact)" | |||||
) | |||||
Done Inline ActionsYou can remove user agent setting code, it is now handled in base lister class. anlambert: You can remove user agent setting code, it is now handled in base lister class. | |||||
logger = logging.getLogger(__name__) | |||||
# Aliasing the page results returned by `get_pages` method from the lister. | |||||
CondaListerPage = Tuple[str, Dict[str, Dict[str, Any]]] | |||||
class CondaLister(StatelessLister[CondaListerPage]): | |||||
"""List Conda (anaconda.com) origins.""" | |||||
LISTER_NAME = "conda" | |||||
VISIT_TYPE = "conda" | |||||
Done Inline ActionsWe should download compressed version instead. REPO_URL_PATTERN = "{url}/{channel}/{arch}/repodata.json.bz2" anlambert: We should download compressed version instead.
```lang=python
REPO_URL_PATTERN =… | |||||
INSTANCE = "conda" | |||||
BASE_REPO_URL = "https://repo.anaconda.com/pkgs" | |||||
REPO_URL_PATTERN = "{url}/{channel}/{arch}/repodata.json.bz2" | |||||
ORIGIN_URL_PATTERN = "https://anaconda.org/{channel}/{pkgname}" | |||||
ARCHIVE_URL_PATTERN = "{url}/{channel}/{arch}/{filename}" | |||||
def __init__( | |||||
self, | |||||
scheduler: SchedulerInterface, | |||||
credentials: Optional[CredentialsType] = None, | |||||
url: str = BASE_REPO_URL, | |||||
channel: str = "", | |||||
archs: List = [], | |||||
): | |||||
super().__init__( | |||||
scheduler=scheduler, | |||||
credentials=credentials, | |||||
instance=self.INSTANCE, | |||||
url=url, | |||||
) | |||||
self.channel: str = channel | |||||
self.archs: List[str] = archs | |||||
self.packages: Dict[str, Any] = defaultdict(dict) | |||||
self.package_dates: Dict[str, Any] = defaultdict(list) | |||||
self.session = requests.Session() | |||||
self.session.headers.update( | |||||
{ | |||||
"Accept": "application/json", | |||||
"User-Agent": USER_AGENT, | |||||
} | |||||
) | |||||
@throttling_retry(before_sleep=before_sleep_log(logger, logging.WARNING)) | |||||
def page_request(self, url: str, params: Dict[str, Any]) -> requests.Response: | |||||
logger.debug("Fetching URL %s with params %s", url, params) | |||||
response = self.session.get(url, params=params) | |||||
if response.status_code != 200: | |||||
logger.warning( | |||||
"Unexpected HTTP status code %s on %s: %s", | |||||
response.status_code, | |||||
response.url, | |||||
response.content, | |||||
) | |||||
response.raise_for_status() | |||||
return response | |||||
Done Inline ActionsYou can remove that method and use self.http_request method from base lister class instead anlambert: You can remove that method and use `self.http_request` method from base lister class instead | |||||
def get_pages(self) -> Iterator[CondaListerPage]: | |||||
"""Yield an iterator which returns 'page'""" | |||||
for arch in self.archs: | |||||
repodata_url = self.REPO_URL_PATTERN.format( | |||||
url=self.url, channel=self.channel, arch=arch | |||||
Done Inline ActionsFor the compressed version, we need to use: packages = json.loads(bz2.decompress(response.content))["packages"] anlambert: For the compressed version, we need to use:
```lang=python
packages = json.loads(bz2.decompress… | |||||
) | |||||
response = self.page_request(url=repodata_url, params={}) | |||||
packages = json.loads(bz2.decompress(response.content))["packages"] | |||||
yield (arch, packages) | |||||
def get_origins_from_page(self, page: CondaListerPage) -> Iterator[ListedOrigin]: | |||||
"""Iterate on all pages and yield ListedOrigin instances.""" | |||||
assert self.lister_obj.id is not None | |||||
arch, packages = page | |||||
for filename, package_metadata in packages.items(): | |||||
artifact = { | |||||
"filename": filename, | |||||
"url": self.ARCHIVE_URL_PATTERN.format( | |||||
url=self.url, | |||||
channel=self.channel, | |||||
filename=filename, | |||||
arch=arch, | |||||
), | |||||
"version": package_metadata["version"], | |||||
"checksums": {}, | |||||
} | |||||
for checksum in ("md5", "sha256"): | |||||
if checksum in package_metadata: | |||||
artifact["checksums"][checksum] = package_metadata[checksum] | |||||
version_key = ( | |||||
f"{arch}/{package_metadata['version']}-{package_metadata['build']}" | |||||
) | |||||
self.packages[package_metadata["name"]][version_key] = artifact | |||||
package_date = None | |||||
if "timestamp" in package_metadata: | |||||
package_date = datetime.datetime.fromtimestamp( | |||||
package_metadata["timestamp"] / 1e3, datetime.timezone.utc | |||||
) | |||||
elif "date" in package_metadata: | |||||
package_date = iso8601.parse_date(package_metadata["date"]) | |||||
last_update = None | |||||
if package_date: | |||||
artifact["date"] = package_date.isoformat() | |||||
self.package_dates[package_metadata["name"]].append(package_date) | |||||
Done Inline ActionsSome nitpicks about more compact code and better naming: for filename, package_metdata in packages.items(): artifact = { "filename": filename, "url": self.ARCHIVE_URL_PATTERN.format( url=self.url, channel=self.channel, filename=filename, arch=arch, ), "version": package_metdata["version"], "checksums": {}, } for checksum in ("md5", "sha256"): if checksum in package_metdata: artifact["checksums"][checksum] = package_metdata[checksum] version_key = ( f"{arch}/{package_metdata['version']}-{package_metdata['build']}" ) self.packages[package_metdata["name"]][version_key] = artifact package_date = None if "timestamp" in package_metdata: package_date = datetime.datetime.fromtimestamp( package_metdata["timestamp"] / 1e3, datetime.timezone.utc ) elif "date" in package_metdata: package_date = iso8601.parse_date(package_metdata["date"]) last_update = None if package_date: artifact["date"] = package_date.isoformat() self.package_dates[package_metdata["name"]].append(package_date) last_update = max(self.package_dates[package_metdata["name"]]) anlambert: Some nitpicks about more compact code and better naming:
```lang=python
for filename… | |||||
last_update = max(self.package_dates[package_metadata["name"]]) | |||||
yield ListedOrigin( | |||||
lister_id=self.lister_obj.id, | |||||
visit_type=self.VISIT_TYPE, | |||||
url=self.ORIGIN_URL_PATTERN.format( | |||||
channel=self.channel, pkgname=package_metadata["name"] | |||||
), | |||||
last_update=last_update, | |||||
extra_loader_arguments={ | |||||
"artifacts": self.packages[package_metadata["name"]], | |||||
}, | |||||
) |
Please add an empty line after license header.