Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/npm/loader.py
Show All 9 Lines | |||||
from codecs import BOM_UTF8 | from codecs import BOM_UTF8 | ||||
from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union | from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union | ||||
import attr | import attr | ||||
import chardet | import chardet | ||||
from urllib.parse import quote | from urllib.parse import quote | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
MetadataAuthority, | |||||
MetadataAuthorityType, | |||||
Person, | Person, | ||||
RevisionType, | RevisionType, | ||||
Revision, | Revision, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.loader.package.loader import BasePackageInfo, PackageLoader | from swh.loader.package.loader import ( | ||||
BasePackageInfo, | |||||
PackageLoader, | |||||
RawExtrinsicMetadataCore, | |||||
) | |||||
from swh.loader.package.utils import api_info, release_name | from swh.loader.package.utils import api_info, release_name | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
EMPTY_PERSON = Person(fullname=b"", name=None, email=None) | EMPTY_PERSON = Person(fullname=b"", name=None, email=None) | ||||
▲ Show 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | def __init__(self, url: str): | ||||
self._versions = None | self._versions = None | ||||
@property | @property | ||||
def info(self) -> Dict[str, Any]: | def info(self) -> Dict[str, Any]: | ||||
"""Return the project metadata information (fetched from npm registry) | """Return the project metadata information (fetched from npm registry) | ||||
""" | """ | ||||
if not self._info: | if not self._info: | ||||
self._info = json.loads(api_info(self.provider_url)) | self._raw_info = api_info(self.provider_url) | ||||
self._info = json.loads(self._raw_info) | |||||
return self._info | return self._info | ||||
def get_versions(self) -> Sequence[str]: | def get_versions(self) -> Sequence[str]: | ||||
return sorted(list(self.info["versions"].keys())) | return sorted(list(self.info["versions"].keys())) | ||||
def get_default_version(self) -> str: | def get_default_version(self) -> str: | ||||
return self.info["dist-tags"].get("latest", "") | return self.info["dist-tags"].get("latest", "") | ||||
def get_metadata_authority(self): | |||||
return MetadataAuthority( | |||||
type=MetadataAuthorityType.FORGE, url="https://npmjs.com/", metadata={}, | |||||
) | |||||
def get_extrinsic_snapshot_metadata(self): | |||||
return [ | |||||
RawExtrinsicMetadataCore( | |||||
format="replicate-npm-package-json", | |||||
metadata=self._raw_info, | |||||
discovery_date=None, | |||||
), | |||||
] | |||||
def get_package_info(self, version: str) -> Iterator[Tuple[str, NpmPackageInfo]]: | def get_package_info(self, version: str) -> Iterator[Tuple[str, NpmPackageInfo]]: | ||||
p_info = NpmPackageInfo.from_metadata( | p_info = NpmPackageInfo.from_metadata( | ||||
project_metadata=self.info, version=version | project_metadata=self.info, version=version | ||||
) | ) | ||||
yield release_name(version), p_info | yield release_name(version), p_info | ||||
def resolve_revision_from( | def resolve_revision_from( | ||||
self, known_artifacts: Dict, p_info: NpmPackageInfo | self, known_artifacts: Dict, p_info: NpmPackageInfo | ||||
▲ Show 20 Lines • Show All 208 Lines • Show Last 20 Lines |