diff --git a/swh/loader/package/crates/loader.py b/swh/loader/package/crates/loader.py index a2ebc2b..2943ae9 100644 --- a/swh/loader/package/crates/loader.py +++ b/swh/loader/package/crates/loader.py @@ -1,354 +1,354 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.version import StrictVersion import json from pathlib import Path from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple from urllib.parse import urlparse import attr import toml from typing_extensions import TypedDict from swh.loader.package.loader import BasePackageInfo, PackageLoader -from swh.loader.package.utils import api_info, cached_method, release_name +from swh.loader.package.utils import cached_method, get_url_body, release_name from swh.model.model import ObjectType, Person, Release, Sha1Git, TimestampWithTimezone from swh.storage.interface import StorageInterface class ExtrinsicPackageMetadata(TypedDict): """Data structure for package extrinsic metadata pulled from http api endpoint. We set only the keys we need according to what is available when querying https://crates.io/api/v1/crates/, where `name` is the name of the crate package (see JSON response example at https://crates.io/api/v1/crates/hg-core). Usage example: .. code-block:: python e_metadata = ExtrinsicPackageMetadata(**self.info()) """ # noqa categories: List[Dict[Any, Any]] """Related categories""" crate: Dict[Any, Any] """Crate project information""" keywords: List[Any] """Keywords""" versions: List[Dict[Any, Any]] """A list of released versions for a crate""" class ExtrinsicVersionPackageMetadata(TypedDict): """Data structure for specific package version extrinsic metadata, pulled from http api endpoint. Similar to `ExtrinsicPackageMetadata` in its usage, but we flatten the data related to a specific version. """ crate: str """The package name""" crate_size: int """The package size""" created_at: str """First released at""" downloads: str """Number of downloads""" license: str """Package license""" num: str """Package version""" published_by: Dict[Any, Any] """Publishers information""" updated_at: str """Last update""" yanked: bool """Is that version yanked? (yanked means release-level deprecation)""" class IntrinsicPackageMetadata(TypedDict): """Data structure for specific package version intrinsic metadata. Data is extracted from the crate package's .toml file. Then the data of the 'package' entry is flattened. Cargo.toml file content example: .. code-block:: toml [package] name = "hg-core" version = "0.0.1" authors = ["Georges Racinet "] description = "Mercurial pure Rust core library, with no assumption on Python bindings (FFI)" homepage = "https://mercurial-scm.org" license = "GPL-2.0-or-later" repository = "https://www.mercurial-scm.org/repo/hg" [lib] name = "hg" [dev-dependencies.rand] version = "~0.6" [dev-dependencies.rand_pcg] version = "~0.1" :param toml: toml object """ name: str """The package name""" version: str """Package version""" authors: List[str] """Authors""" description: str """Package and release description""" homepage: str """Homepage of the project""" license: str """Package license""" repository: str """Source code repository""" @attr.s class CratesPackageInfo(BasePackageInfo): name = attr.ib(type=str) """Name of the package""" version = attr.ib(type=str) """Current version""" e_metadata: Dict[str, Any] = attr.ib(factory=ExtrinsicPackageMetadata) """Extrinsic package metadata, common to all versions""" e_metadata_version: Dict[str, Any] = attr.ib( factory=ExtrinsicVersionPackageMetadata ) """Extrinsic package metadata specific to a version""" i_metadata: Dict[str, Any] = attr.ib(factory=IntrinsicPackageMetadata) """Intrinsic metadata of the current package version""" def extract_intrinsic_metadata(dir_path: Path) -> Dict[str, Any]: """Extract intrinsic metadata from Cargo.toml file at dir_path. Each crate archive has a Cargo.toml at the root of the archive. Args: dir_path: A directory on disk where a Cargo.toml must be present Returns: A dict mapping from toml parser """ return toml.load(dir_path / "Cargo.toml") def extract_author(p_info: CratesPackageInfo) -> Person: """Extract package author from intrinsic metadata and return it as a `Person` model. Args: p_info: CratesPackageInfo that should contains i_metadata entries Returns: Only one author (Person) of the package. Currently limited by internal detail of the swh stack (see T3887). """ authors = p_info.i_metadata["authors"] fullname = authors[0] # TODO: here we have a list of author, see T3887 return Person.from_fullname(fullname.encode()) def extract_description(p_info: CratesPackageInfo) -> str: """Extract package description from intrinsic metadata and return it as a string. Args: p_info: CratesPackageInfo that should contains i_metadata and entries Returns: Package description from metadata. """ return p_info.i_metadata["description"] class CratesLoader(PackageLoader[CratesPackageInfo]): """Load Crates package origins into swh archive.""" visit_type = "crates" def __init__( self, storage: StorageInterface, url: str, artifacts: List[Dict[str, Any]], **kwargs, ): """Constructor Args: url: Origin url, (e.g. https://crates.io/api/v1/crates/) artifacts: A list of dict listing all existing released versions for a package (Usually set with crates lister `extra_loader_arguments`). Each line is a dict that should have an `url` (where to download package specific version) and a `version` entry. Example:: [ { "version": , "url": "https://static.crates.io/crates//-.crate", } ] """ # noqa super().__init__(storage=storage, url=url, **kwargs) self.url = url self.artifacts: Dict[str, Dict] = { artifact["version"]: artifact for artifact in artifacts } @cached_method def _raw_info(self) -> bytes: """Get crate metadata (fetched from http api endpoint set as self.url) Returns: Content response as bytes. Content response is a json document. """ - return api_info(self.url) + return get_url_body(self.url) @cached_method def info(self) -> Dict: """Parse http api json response and return the crate metadata information as a Dict.""" return json.loads(self._raw_info()) def get_versions(self) -> Sequence[str]: """Get all released versions of a crate Returns: A sequence of versions Example:: ["0.1.1", "0.10.2"] """ versions = list(self.artifacts.keys()) versions.sort(key=StrictVersion) return versions def get_default_version(self) -> str: """Get the newest release version of a crate Returns: A string representing a version Example:: "0.1.2" """ return self.get_versions()[-1] def get_package_info(self, version: str) -> Iterator[Tuple[str, CratesPackageInfo]]: """Get release name and package information from version Args: version: crate version (e.g: "0.1.0") Returns: Iterator of tuple (release_name, p_info) """ artifact = self.artifacts[version] filename = artifact["filename"] package_name = urlparse(self.url).path.split("/")[-1] url = artifact["url"] # Get extrinsic metadata from http api e_metadata = ExtrinsicPackageMetadata(**self.info()) # type: ignore[misc] # Extract crate info for current version (One .crate file for a given version) (crate_version,) = [ crate for crate in e_metadata["versions"] if crate["num"] == version ] e_metadata_version = ExtrinsicVersionPackageMetadata( # type: ignore[misc] **crate_version ) p_info = CratesPackageInfo( name=package_name, filename=filename, url=url, version=version, e_metadata=e_metadata, e_metadata_version=e_metadata_version, ) yield release_name(version, filename), p_info def build_release( self, p_info: CratesPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Release]: # Extract intrinsic metadata from dir_path/Cargo.toml name = p_info.name version = p_info.version dir_path = Path(uncompressed_path, f"{name}-{version}") i_metadata_raw = extract_intrinsic_metadata(dir_path) # Get only corresponding key of IntrinsicPackageMetadata i_metadata_keys = [k for k in IntrinsicPackageMetadata.__annotations__.keys()] # We use data only from "package" entry i_metadata = { k: v for k, v in i_metadata_raw["package"].items() if k in i_metadata_keys } p_info.i_metadata = IntrinsicPackageMetadata(**i_metadata) # type: ignore[misc] author = extract_author(p_info) description = extract_description(p_info) message = ( f"Synthetic release for Crate source package {p_info.name} " f"version {p_info.version}\n\n" f"{description}\n" ) # The only way to get a value for updated_at is through extrinsic metadata updated_at = p_info.e_metadata_version.get("updated_at") return Release( name=version.encode(), author=author, date=TimestampWithTimezone.from_iso8601(updated_at), message=message.encode(), target_type=ObjectType.DIRECTORY, target=directory, synthetic=True, ) diff --git a/swh/loader/package/golang/loader.py b/swh/loader/package/golang/loader.py index 30732d9..0bc68a4 100644 --- a/swh/loader/package/golang/loader.py +++ b/swh/loader/package/golang/loader.py @@ -1,104 +1,109 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import re from typing import Iterator, Optional, Sequence, Tuple import attr from swh.loader.package.loader import BasePackageInfo, PackageLoader -from swh.loader.package.utils import EMPTY_AUTHOR, api_info, cached_method, release_name +from swh.loader.package.utils import ( + EMPTY_AUTHOR, + get_url_body, + release_name, + cached_method, +) from swh.model.model import ObjectType, Release, Sha1Git, TimestampWithTimezone from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) def _uppercase_encode(url: str) -> str: return re.sub("([A-Z]{1})", r"!\1", url).lower() @attr.s class GolangPackageInfo(BasePackageInfo): name = attr.ib(type=str) timestamp = attr.ib(type=Optional[TimestampWithTimezone]) class GolangLoader(PackageLoader[GolangPackageInfo]): """Load Golang module zip file into SWH archive.""" visit_type = "golang" GOLANG_PKG_DEV_URL = "https://pkg.go.dev" GOLANG_PROXY_URL = "https://proxy.golang.org" def __init__( self, storage: StorageInterface, url: str, max_content_size: Optional[int] = None, **kwargs, ): super().__init__(storage, url, max_content_size=max_content_size, **kwargs) # The lister saves human-usable URLs, so we translate them to proxy URLs # for use in the loader. # This URL format is detailed in https://go.dev/ref/mod#goproxy-protocol assert url.startswith( self.GOLANG_PKG_DEV_URL ), "Go package URL (%s) not from %s" % (url, self.GOLANG_PKG_DEV_URL) self.name = url[len(self.GOLANG_PKG_DEV_URL) + 1 :] self.url = url.replace(self.GOLANG_PKG_DEV_URL, self.GOLANG_PROXY_URL) self.url = _uppercase_encode(self.url) def get_versions(self) -> Sequence[str]: - versions = api_info(f"{self.url}/@v/list").decode().splitlines() + versions = get_url_body(f"{self.url}/@v/list").decode().splitlines() # some go packages only have a development version not listed by the endpoint above, # so ensure to return it or it will be missed by the golang loader default_version = self.get_default_version() if default_version not in versions: versions.append(default_version) return versions @cached_method def get_default_version(self) -> str: - latest = api_info(f"{self.url}/@latest") + latest = get_url_body(f"{self.url}/@latest") return json.loads(latest)["Version"] def _raw_info(self, version: str) -> dict: url = f"{self.url}/@v/{_uppercase_encode(version)}.info" - return json.loads(api_info(url)) + return json.loads(get_url_body(url)) def get_package_info(self, version: str) -> Iterator[Tuple[str, GolangPackageInfo]]: # Encode the name because creating nested folders can become problematic encoded_name = self.name.replace("/", "__") filename = f"{encoded_name}-{version}.zip" timestamp = TimestampWithTimezone.from_iso8601(self._raw_info(version)["Time"]) p_info = GolangPackageInfo( url=f"{self.url}/@v/{version}.zip", filename=filename, version=version, timestamp=timestamp, name=self.name, ) yield release_name(version), p_info def build_release( self, p_info: GolangPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Release]: msg = ( f"Synthetic release for Golang source package {p_info.name} " f"version {p_info.version}\n" ) return Release( name=p_info.version.encode(), message=msg.encode(), date=p_info.timestamp, author=EMPTY_AUTHOR, # Go modules offer very little metadata target_type=ObjectType.DIRECTORY, target=directory, synthetic=True, ) diff --git a/swh/loader/package/nixguix/loader.py b/swh/loader/package/nixguix/loader.py index 9648790..46eeaf0 100644 --- a/swh/loader/package/nixguix/loader.py +++ b/swh/loader/package/nixguix/loader.py @@ -1,308 +1,308 @@ # Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import json import logging import re from typing import Any, Dict, Iterator, List, Mapping, Optional, Set, Tuple import attr from swh.loader.package.loader import ( BasePackageInfo, PackageLoader, PartialExtID, RawExtrinsicMetadataCore, ) -from swh.loader.package.utils import EMPTY_AUTHOR, api_info, cached_method +from swh.loader.package.utils import EMPTY_AUTHOR, cached_method, get_url_body from swh.model import hashutil from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, ObjectType, Release, Sha1Git, ) from swh.model.swhids import CoreSWHID from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) EXTID_TYPE = "subresource-integrity" """The ExtID is an ASCII string, as defined by https://w3c.github.io/webappsec-subresource-integrity/""" EXTID_VERSION = 0 @attr.s class NixGuixPackageInfo(BasePackageInfo): raw_info = attr.ib(type=Dict[str, Any]) integrity = attr.ib(type=str) """Hash of the archive, formatted as in the Subresource Integrity specification.""" @classmethod def from_metadata( cls, metadata: Dict[str, Any], version: str ) -> "NixGuixPackageInfo": return cls( url=metadata["url"], filename=None, version=version, integrity=metadata["integrity"], raw_info=metadata, ) def extid(self) -> PartialExtID: return (EXTID_TYPE, EXTID_VERSION, self.integrity.encode("ascii")) class NixGuixLoader(PackageLoader[NixGuixPackageInfo]): """Load sources from a sources.json file. This loader is used to load sources used by functional package manager (eg. Nix and Guix). """ visit_type = "nixguix" def __init__( self, storage: StorageInterface, url: str, unsupported_file_extensions: List[str] = [], **kwargs: Any, ): super().__init__(storage=storage, url=url, **kwargs) self.provider_url = url self.unsupported_file_extensions = unsupported_file_extensions # Note: this could be renamed get_artifacts in the PackageLoader # base class. @cached_method def raw_sources(self): return retrieve_sources(self.origin.url) @cached_method def supported_sources(self): raw_sources = self.raw_sources() return clean_sources( parse_sources(raw_sources), self.unsupported_file_extensions ) @cached_method def integrity_by_url(self) -> Dict[str, str]: sources = self.supported_sources() return {s["urls"][0]: s["integrity"] for s in sources["sources"]} def get_versions(self) -> List[str]: """The first mirror of the mirror list is used as branch name in the snapshot. """ return list(self.integrity_by_url().keys()) def get_metadata_authority(self): return MetadataAuthority( type=MetadataAuthorityType.FORGE, url=self.origin.url, metadata={}, ) def get_extrinsic_snapshot_metadata(self): return [ RawExtrinsicMetadataCore( format="nixguix-sources-json", metadata=self.raw_sources(), ), ] # Note: this could be renamed get_artifact_info in the PackageLoader # base class. def get_package_info(self, url) -> Iterator[Tuple[str, NixGuixPackageInfo]]: # TODO: try all mirrors and not only the first one. A source # can be fetched from several urls, called mirrors. We # currently only use the first one, but if the first one # fails, we should try the second one and so on. integrity = self.integrity_by_url()[url] p_info = NixGuixPackageInfo.from_metadata( {"url": url, "integrity": integrity}, version=url ) yield url, p_info def select_extid_target( self, p_info: NixGuixPackageInfo, extid_targets: Set[CoreSWHID] ) -> Optional[CoreSWHID]: if extid_targets: # The archive URL is part of the release name. As that URL is not # intrinsic metadata, it means different releases may be created for # the same SRI so they have the same extid. # Therefore, we need to pick the one with the right URL. releases = self.storage.release_get( [target.object_id for target in extid_targets] ) extid_targets = { release.swhid() for release in releases if release is not None and release.name == p_info.version.encode() } return super().select_extid_target(p_info, extid_targets) def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """We add a branch to the snapshot called 'evaluation' pointing to the revision used to generate the sources.json file. This revision is specified in the sources.json file itself. For the nixpkgs origin, this revision is coming from the github.com/nixos/nixpkgs repository. Note this repository is not loaded explicitly. So, this pointer can target a nonexistent revision for a time. However, the github and gnu loaders are supposed to load this revision and should create the revision pointed by this branch. This branch can be used to identify the snapshot associated to a Nix/Guix evaluation. """ # The revision used to create the sources.json file. For Nix, # this revision belongs to the github.com/nixos/nixpkgs # repository revision = self.supported_sources()["revision"] return { b"evaluation": { "target_type": "revision", "target": hashutil.hash_to_bytes(revision), } } def build_release( self, p_info: NixGuixPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Release]: return Release( name=p_info.version.encode(), message=None, author=EMPTY_AUTHOR, date=None, target=directory, target_type=ObjectType.DIRECTORY, synthetic=True, ) def retrieve_sources(url: str) -> bytes: """Retrieve sources. Potentially raise NotFound error.""" - return api_info(url, allow_redirects=True) + return get_url_body(url, allow_redirects=True) def parse_sources(raw_sources: bytes) -> Dict[str, Any]: return json.loads(raw_sources.decode("utf-8")) def make_pattern_unsupported_file_extension( unsupported_file_extensions: List[str], ): """Make a regexp pattern for unsupported file extension out of a list of unsupported archive extension list. """ return re.compile( rf".*\.({'|'.join(map(re.escape, unsupported_file_extensions))})$", re.DOTALL ) def clean_sources( sources: Dict[str, Any], unsupported_file_extensions=[] ) -> Dict[str, Any]: """Validate and clean the sources structure. First, ensure all top level keys are present. Then, walk the sources list and remove sources that do not contain required keys. Filter out source entries whose: - required keys are missing - source type is not supported - urls attribute type is not a list - extension is known not to be supported by the loader Raises: ValueError if: - a required top level key is missing - top-level version is not 1 Returns: source Dict cleaned up """ pattern_unsupported_file = make_pattern_unsupported_file_extension( unsupported_file_extensions ) # Required top level keys required_keys = ["version", "revision", "sources"] missing_keys = [] for required_key in required_keys: if required_key not in sources: missing_keys.append(required_key) if missing_keys != []: raise ValueError( f"sources structure invalid, missing: {','.join(missing_keys)}" ) # Only the version 1 is currently supported version = int(sources["version"]) if version != 1: raise ValueError( f"The sources structure version '{sources['version']}' is not supported" ) # If a source doesn't contain required attributes, this source is # skipped but others could still be archived. verified_sources = [] for source in sources["sources"]: valid = True required_keys = ["urls", "integrity", "type"] for required_key in required_keys: if required_key not in source: logger.info( f"Skip source '{source}' because key '{required_key}' is missing", ) valid = False if valid and source["type"] != "url": logger.info( f"Skip source '{source}' because the type {source['type']} " "is not supported", ) valid = False if valid and not isinstance(source["urls"], list): logger.info( f"Skip source {source} because the urls attribute is not a list" ) valid = False if valid and len(source["urls"]) > 0: # Filter out unsupported archives supported_sources: List[str] = [] for source_url in source["urls"]: if pattern_unsupported_file.match(source_url): logger.info(f"Skip unsupported artifact url {source_url}") continue supported_sources.append(source_url) if len(supported_sources) == 0: logger.info( f"Skip source {source} because urls only reference " "unsupported artifacts. Unsupported " f"artifacts so far: {pattern_unsupported_file}" ) continue new_source = copy.deepcopy(source) new_source["urls"] = supported_sources verified_sources.append(new_source) sources["sources"] = verified_sources return sources diff --git a/swh/loader/package/npm/loader.py b/swh/loader/package/npm/loader.py index 91081c6..a44e22d 100644 --- a/swh/loader/package/npm/loader.py +++ b/swh/loader/package/npm/loader.py @@ -1,300 +1,300 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from codecs import BOM_UTF8 import json import logging import os import string from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple, Union from urllib.parse import quote import attr import chardet from swh.loader.package.loader import ( BasePackageInfo, PackageLoader, RawExtrinsicMetadataCore, ) -from swh.loader.package.utils import api_info, cached_method, release_name +from swh.loader.package.utils import cached_method, get_url_body, release_name from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, ObjectType, Person, Release, Sha1Git, TimestampWithTimezone, ) from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) EMPTY_PERSON = Person.from_fullname(b"") @attr.s class NpmPackageInfo(BasePackageInfo): raw_info = attr.ib(type=Dict[str, Any]) package_name = attr.ib(type=str) date = attr.ib(type=Optional[str]) shasum = attr.ib(type=str) """sha1 checksum""" # we cannot rely only on $shasum, as it is technically possible for two versions # of the same package to have the exact same tarball. # But the release data (message and date) are extrinsic to the content of the # package, so they differ between versions. # So we need every attribute used to build the release object to be part of the # manifest. MANIFEST_FORMAT = string.Template( "date $date\nname $package_name\nshasum $shasum\nurl $url\nversion $version" ) EXTID_TYPE = "npm-manifest-sha256" EXTID_VERSION = 0 @classmethod def from_metadata( cls, project_metadata: Dict[str, Any], version: str ) -> "NpmPackageInfo": package_metadata = project_metadata["versions"][version] url = package_metadata["dist"]["tarball"] assert package_metadata["name"] == project_metadata["name"] # No date available in intrinsic metadata: retrieve it from the API # metadata, using the version number that the API claims this package # has. extrinsic_version = package_metadata["version"] if "time" in project_metadata: date = project_metadata["time"][extrinsic_version] elif "mtime" in package_metadata: date = package_metadata["mtime"] else: date = None return cls( package_name=package_metadata["name"], url=url, filename=os.path.basename(url), date=date, shasum=package_metadata["dist"]["shasum"], version=extrinsic_version, raw_info=package_metadata, directory_extrinsic_metadata=[ RawExtrinsicMetadataCore( format="replicate-npm-package-json", metadata=json.dumps(package_metadata).encode(), ) ], ) class NpmLoader(PackageLoader[NpmPackageInfo]): """Load npm origin's artifact releases into swh archive.""" visit_type = "npm" def __init__(self, storage: StorageInterface, url: str, **kwargs: Any): """Constructor Args str: origin url (e.g. https://www.npmjs.com/package/) """ super().__init__(storage=storage, url=url, **kwargs) self.package_name = url.split("https://www.npmjs.com/package/")[1] safe_name = quote(self.package_name, safe="") self.provider_url = f"https://replicate.npmjs.com/{safe_name}/" self._info: Dict[str, Any] = {} self._versions = None @cached_method def _raw_info(self) -> bytes: - return api_info(self.provider_url) + return get_url_body(self.provider_url) @cached_method def info(self) -> Dict: """Return the project metadata information (fetched from npm registry)""" return json.loads(self._raw_info()) def get_versions(self) -> Sequence[str]: return sorted(list(self.info()["versions"].keys())) def get_default_version(self) -> str: return self.info()["dist-tags"].get("latest", "") def get_metadata_authority(self): return MetadataAuthority( type=MetadataAuthorityType.FORGE, url="https://npmjs.com/", metadata={}, ) def get_package_info(self, version: str) -> Iterator[Tuple[str, NpmPackageInfo]]: p_info = NpmPackageInfo.from_metadata( project_metadata=self.info(), version=version ) yield release_name(version), p_info def build_release( self, p_info: NpmPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Release]: # Metadata from NPM is not intrinsic to tarballs. # This means two package versions can have the same tarball, but different # metadata. To avoid mixing up releases, every field used to build the # release object must be part of NpmPackageInfo.MANIFEST_FORMAT. i_metadata = extract_intrinsic_metadata(uncompressed_path) if not i_metadata: return None author = extract_npm_package_author(i_metadata) assert self.package_name == p_info.package_name msg = ( f"Synthetic release for NPM source package {p_info.package_name} " f"version {p_info.version}\n" ) if p_info.date is None: url = p_info.url artifact_name = os.path.basename(url) raise ValueError( "Origin %s: Cannot determine upload time for artifact %s." % (p_info.url, artifact_name) ) date = TimestampWithTimezone.from_iso8601(p_info.date) # FIXME: this is to remain bug-compatible with earlier versions: date = attr.evolve(date, timestamp=attr.evolve(date.timestamp, microseconds=0)) r = Release( name=p_info.version.encode(), message=msg.encode(), author=author, date=date, target=directory, target_type=ObjectType.DIRECTORY, synthetic=True, ) return r def _author_str(author_data: Union[Dict, List, str]) -> str: """Parse author from package.json author fields""" if isinstance(author_data, dict): author_str = "" name = author_data.get("name") if name is not None: if isinstance(name, str): author_str += name elif isinstance(name, list): author_str += _author_str(name[0]) if len(name) > 0 else "" email = author_data.get("email") if email is not None: author_str += f" <{email}>" result = author_str elif isinstance(author_data, list): result = _author_str(author_data[0]) if len(author_data) > 0 else "" else: result = author_data return result def extract_npm_package_author(package_json: Dict[str, Any]) -> Person: """ Extract package author from a ``package.json`` file content and return it in swh format. Args: package_json: Dict holding the content of parsed ``package.json`` file Returns: Person """ for author_key in ("author", "authors"): if author_key in package_json: author_data = package_json[author_key] if author_data is None: return EMPTY_PERSON author_str = _author_str(author_data) return Person.from_fullname(author_str.encode()) return EMPTY_PERSON def _lstrip_bom(s, bom=BOM_UTF8): if s.startswith(bom): return s[len(bom) :] else: return s def load_json(json_bytes): """ Try to load JSON from bytes and return a dictionary. First try to decode from utf-8. If the decoding failed, try to detect the encoding and decode again with replace error handling. If JSON is malformed, an empty dictionary will be returned. Args: json_bytes (bytes): binary content of a JSON file Returns: dict: JSON data loaded in a dictionary """ json_data = {} try: json_str = _lstrip_bom(json_bytes).decode("utf-8") except UnicodeDecodeError: encoding = chardet.detect(json_bytes)["encoding"] if encoding: json_str = json_bytes.decode(encoding, "replace") try: json_data = json.loads(json_str) except json.decoder.JSONDecodeError: pass return json_data def extract_intrinsic_metadata(dir_path: str) -> Dict: """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from npm. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive if not os.path.exists(dir_path): return {} lst = os.listdir(dir_path) if len(lst) == 0: return {} project_dirname = lst[0] package_json_path = os.path.join(dir_path, project_dirname, "package.json") if not os.path.exists(package_json_path): return {} with open(package_json_path, "rb") as package_json_file: package_json_bytes = package_json_file.read() return load_json(package_json_bytes) diff --git a/swh/loader/package/pubdev/loader.py b/swh/loader/package/pubdev/loader.py index bcce138..608457a 100644 --- a/swh/loader/package/pubdev/loader.py +++ b/swh/loader/package/pubdev/loader.py @@ -1,194 +1,194 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path from typing import Any, Dict, Iterator, Optional, Sequence, Tuple import attr from packaging.version import parse as parse_version import yaml from swh.loader.package.loader import BasePackageInfo, PackageLoader from swh.loader.package.utils import ( EMPTY_AUTHOR, Person, - api_info, cached_method, + get_url_body, release_name, ) from swh.model.model import ObjectType, Release, Sha1Git, TimestampWithTimezone from swh.storage.interface import StorageInterface @attr.s class PubDevPackageInfo(BasePackageInfo): name = attr.ib(type=str) """Name of the package""" version = attr.ib(type=str) """Current version""" last_modified = attr.ib(type=str) """Last modified date as release date""" author = attr.ib(type=Person) """Author""" description = attr.ib(type=str) """Description""" def extract_intrinsic_metadata(dir_path: Path) -> Dict[str, Any]: """Extract intrinsic metadata from pubspec.yaml file at dir_path. Each pub.dev package version has a pubspec.yaml file at the root of the archive. See https://dart.dev/tools/pub/pubspec for pubspec specifications. Args: dir_path: A directory on disk where a pubspec.yaml must be present Returns: A dict mapping from yaml parser """ pubspec_path = dir_path / "pubspec.yaml" return yaml.safe_load(pubspec_path.read_text()) class PubDevLoader(PackageLoader[PubDevPackageInfo]): visit_type = "pubdev" PUBDEV_BASE_URL = "https://pub.dev/" def __init__( self, storage: StorageInterface, url: str, **kwargs, ): super().__init__(storage=storage, url=url, **kwargs) self.url = url assert url.startswith(self.PUBDEV_BASE_URL) self.package_info_url = url.replace( self.PUBDEV_BASE_URL, f"{self.PUBDEV_BASE_URL}api/" ) def _raw_info(self) -> bytes: - return api_info(self.package_info_url) + return get_url_body(self.package_info_url) @cached_method def info(self) -> Dict: """Return the project metadata information (fetched from pub.dev registry)""" # Use strict=False in order to correctly manage case where \n is present in a string info = json.loads(self._raw_info(), strict=False) # Arrange versions list as a new dict with `version` as key versions = {v["version"]: v for v in info["versions"]} info["versions"] = versions return info def get_versions(self) -> Sequence[str]: """Get all released versions of a PubDev package Returns: A sequence of versions Example:: ["0.1.1", "0.10.2"] """ versions = list(self.info()["versions"].keys()) versions.sort(key=parse_version) return versions def get_default_version(self) -> str: """Get the newest release version of a PubDev package Returns: A string representing a version Example:: "0.1.2" """ latest = self.info()["latest"] return latest["version"] def get_package_info(self, version: str) -> Iterator[Tuple[str, PubDevPackageInfo]]: """Get release name and package information from version Package info comes from extrinsic metadata (from self.info()) Args: version: Package version (e.g: "0.1.0") Returns: Iterator of tuple (release_name, p_info) """ v = self.info()["versions"][version] assert v["version"] == version url = v["archive_url"] name = v["pubspec"]["name"] filename = f"{name}-{version}.tar.gz" last_modified = v["published"] if "authors" in v["pubspec"]: # TODO: here we have a list of author, see T3887 author = Person.from_fullname(v["pubspec"]["authors"][0].encode()) elif "author" in v["pubspec"] and v["pubspec"]["author"] is not None: author = Person.from_fullname(v["pubspec"]["author"].encode()) else: author = EMPTY_AUTHOR description = v["pubspec"]["description"] p_info = PubDevPackageInfo( name=name, filename=filename, url=url, version=version, last_modified=last_modified, author=author, description=description, ) yield release_name(version), p_info def build_release( self, p_info: PubDevPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Release]: # Extract intrinsic metadata from uncompressed_path/pubspec.yaml intrinsic_metadata = extract_intrinsic_metadata(Path(uncompressed_path)) name: str = intrinsic_metadata["name"] version: str = intrinsic_metadata["version"] assert version == p_info.version # author from intrinsic_metadata should not take precedence over the one # returned by the api, see https://dart.dev/tools/pub/pubspec#authorauthors author: Person = p_info.author if "description" in intrinsic_metadata and intrinsic_metadata["description"]: description = intrinsic_metadata["description"] else: description = p_info.description message = ( f"Synthetic release for pub.dev source package {name} " f"version {version}\n\n" f"{description}\n" ) return Release( name=version.encode(), author=author, date=TimestampWithTimezone.from_iso8601(p_info.last_modified), message=message.encode(), target_type=ObjectType.DIRECTORY, target=directory, synthetic=True, ) diff --git a/swh/loader/package/pypi/loader.py b/swh/loader/package/pypi/loader.py index cb427a9..fe814f7 100644 --- a/swh/loader/package/pypi/loader.py +++ b/swh/loader/package/pypi/loader.py @@ -1,243 +1,248 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import os from typing import Any, Dict, Iterator, Optional, Sequence, Tuple from urllib.parse import urlparse import attr from pkginfo import UnpackedSDist from swh.loader.package.loader import ( BasePackageInfo, PackageLoader, PartialExtID, RawExtrinsicMetadataCore, ) -from swh.loader.package.utils import EMPTY_AUTHOR, api_info, cached_method, release_name +from swh.loader.package.utils import ( + EMPTY_AUTHOR, + cached_method, + get_url_body, + release_name, +) from swh.model.hashutil import hash_to_bytes from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, ObjectType, Person, Release, Sha1Git, TimestampWithTimezone, ) from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) EXTID_TYPE = "pypi-archive-sha256" EXTID_VERSION = 0 @attr.s class PyPIPackageInfo(BasePackageInfo): raw_info = attr.ib(type=Dict[str, Any]) name = attr.ib(type=str) comment_text = attr.ib(type=Optional[str]) sha256 = attr.ib(type=str) upload_time = attr.ib(type=str) @classmethod def from_metadata( cls, metadata: Dict[str, Any], name: str, version: str ) -> "PyPIPackageInfo": return cls( url=metadata["url"], filename=metadata["filename"], version=version, raw_info=metadata, name=name, comment_text=metadata.get("comment_text"), sha256=metadata["digests"]["sha256"], upload_time=metadata["upload_time"], directory_extrinsic_metadata=[ RawExtrinsicMetadataCore( format="pypi-project-json", metadata=json.dumps(metadata).encode(), ) ], ) def extid(self) -> PartialExtID: return (EXTID_TYPE, EXTID_VERSION, hash_to_bytes(self.sha256)) class PyPILoader(PackageLoader[PyPIPackageInfo]): """Load pypi origin's artifact releases into swh archive.""" visit_type = "pypi" def __init__(self, storage: StorageInterface, url: str, **kwargs): super().__init__(storage=storage, url=url, **kwargs) self.provider_url = pypi_api_url(self.origin.url) @cached_method def _raw_info(self) -> bytes: - return api_info(self.provider_url) + return get_url_body(self.provider_url) @cached_method def info(self) -> Dict: """Return the project metadata information (fetched from pypi registry)""" return json.loads(self._raw_info()) def get_versions(self) -> Sequence[str]: return self.info()["releases"].keys() def get_default_version(self) -> str: return self.info()["info"]["version"] def get_metadata_authority(self): p_url = urlparse(self.origin.url) return MetadataAuthority( type=MetadataAuthorityType.FORGE, url=f"{p_url.scheme}://{p_url.netloc}/", metadata={}, ) def get_package_info(self, version: str) -> Iterator[Tuple[str, PyPIPackageInfo]]: res = [] for meta in self.info()["releases"][version]: # process only standard sdist archives if meta["packagetype"] != "sdist" or meta["filename"].lower().endswith( (".deb", ".egg", ".rpm", ".whl") ): continue p_info = PyPIPackageInfo.from_metadata( meta, name=self.info()["info"]["name"], version=version ) res.append((version, p_info)) if len(res) == 1: version, p_info = res[0] yield release_name(version), p_info else: for version, p_info in res: yield release_name(version, p_info.filename), p_info def build_release( self, p_info: PyPIPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Release]: i_metadata = extract_intrinsic_metadata(uncompressed_path) if not i_metadata: return None # from intrinsic metadata version_ = i_metadata.get("version", p_info.version) author_ = author(i_metadata) if p_info.comment_text: msg = p_info.comment_text else: msg = ( f"Synthetic release for PyPI source package {p_info.name} " f"version {version_}\n" ) date = TimestampWithTimezone.from_iso8601(p_info.upload_time) return Release( name=p_info.version.encode(), message=msg.encode(), author=author_, date=date, target=directory, target_type=ObjectType.DIRECTORY, synthetic=True, ) def pypi_api_url(url: str) -> str: """Compute api url from a project url Args: url (str): PyPI instance's url (e.g: https://pypi.org/project/requests) This deals with correctly transforming the project's api url (e.g https://pypi.org/pypi/requests/json) Returns: api url """ p_url = urlparse(url) project_name = p_url.path.rstrip("/").split("/")[-1] url = "%s://%s/pypi/%s/json" % (p_url.scheme, p_url.netloc, project_name) return url def extract_intrinsic_metadata(dir_path: str) -> Dict: """Given an uncompressed path holding the pkginfo file, returns a pkginfo parsed structure as a dict. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from pypi. Returns: the pkginfo parsed structure as a dict if any or None if none was present. """ # Retrieve the root folder of the archive if not os.path.exists(dir_path): return {} lst = os.listdir(dir_path) if len(lst) != 1: return {} project_dirname = lst[0] pkginfo_path = os.path.join(dir_path, project_dirname, "PKG-INFO") if not os.path.exists(pkginfo_path): return {} pkginfo = UnpackedSDist(pkginfo_path) raw = pkginfo.__dict__ raw.pop("filename") # this gets added with the ondisk location return raw def author(data: Dict) -> Person: """Given a dict of project/release artifact information (coming from PyPI), returns an author subset. Args: data (dict): Representing either artifact information or release information. Returns: swh-model dict representing a person. """ name = data.get("author") email = data.get("author_email") fullname = None # type: Optional[str] if email: fullname = "%s <%s>" % (name, email) else: fullname = name if not fullname: return EMPTY_AUTHOR if name is not None: name = name.encode("utf-8") if email is not None: email = email.encode("utf-8") return Person(fullname=fullname.encode("utf-8"), name=name, email=email) diff --git a/swh/loader/package/tests/test_utils.py b/swh/loader/package/tests/test_utils.py index ea0e166..acff6af 100644 --- a/swh/loader/package/tests/test_utils.py +++ b/swh/loader/package/tests/test_utils.py @@ -1,308 +1,308 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import os from unittest.mock import MagicMock from urllib.error import URLError from urllib.parse import quote import pytest from requests.exceptions import HTTPError import swh.loader.package -from swh.loader.package.utils import api_info, download, release_name +from swh.loader.package.utils import download, get_url_body, release_name def test_version_generation(): assert ( swh.loader.package.__version__ != "devel" ), "Make sure swh.loader.core is installed (e.g. pip install -e .)" @pytest.mark.fs def test_download_fail_to_download(tmp_path, requests_mock): url = "https://pypi.org/pypi/arrow/json" status_code = 404 requests_mock.get(url, status_code=status_code) with pytest.raises( HTTPError, match=f"{status_code} Client Error: None for url: {url}" ): download(url, tmp_path) _filename = "requests-0.0.1.tar.gz" _data = "this is something" def _check_download_ok(url, dest, filename=_filename, hashes={}): actual_filepath, actual_hashes = download(url, dest, hashes=hashes) actual_filename = os.path.basename(actual_filepath) assert actual_filename == filename assert actual_hashes["length"] == len(_data) assert ( actual_hashes["checksums"]["sha1"] == "fdd1ce606a904b08c816ba84f3125f2af44d92b2" ) assert ( actual_hashes["checksums"]["sha256"] == "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5" ) @pytest.mark.fs def test_download_ok(tmp_path, requests_mock): """Download without issue should provide filename and hashes""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data, headers={"content-length": str(len(_data))}) _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs def test_download_ok_no_header(tmp_path, requests_mock): """Download without issue should provide filename and hashes""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data) # no header information _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs def test_download_ok_with_hashes(tmp_path, requests_mock): """Download without issue should provide filename and hashes""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data, headers={"content-length": str(len(_data))}) # good hashes for such file good = { "sha1": "fdd1ce606a904b08c816ba84f3125f2af44d92b2", "sha256": "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5", # noqa } _check_download_ok(url, dest=str(tmp_path), hashes=good) @pytest.mark.fs def test_download_fail_hashes_mismatch(tmp_path, requests_mock): """Mismatch hash after download should raise""" url = f"https://pypi.org/pypi/requests/{_filename}" requests_mock.get(url, text=_data, headers={"content-length": str(len(_data))}) # good hashes for such file good = { "sha1": "fdd1ce606a904b08c816ba84f3125f2af44d92b2", "sha256": "1d9224378d77925d612c9f926eb9fb92850e6551def8328011b6a972323298d5", # noqa } for hash_algo in good.keys(): wrong_hash = good[hash_algo].replace("1", "0") expected_hashes = good.copy() expected_hashes[hash_algo] = wrong_hash # set the wrong hash expected_msg = "Failure when fetching %s. " "Checksum mismatched: %s != %s" % ( url, wrong_hash, good[hash_algo], ) with pytest.raises(ValueError, match=expected_msg): download(url, dest=str(tmp_path), hashes=expected_hashes) @pytest.mark.fs def test_ftp_download_ok(tmp_path, mocker): """Download without issue should provide filename and hashes""" url = f"ftp://pypi.org/pypi/requests/{_filename}" cm = MagicMock() cm.getstatus.return_value = 200 cm.read.side_effect = [_data.encode(), b""] cm.__enter__.return_value = cm mocker.patch("swh.loader.package.utils.urlopen").return_value = cm _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs def test_ftp_download_ko(tmp_path, mocker): """Download without issue should provide filename and hashes""" filename = "requests-0.0.1.tar.gz" url = "ftp://pypi.org/pypi/requests/%s" % filename mocker.patch("swh.loader.package.utils.urlopen").side_effect = URLError("FTP error") with pytest.raises(URLError): download(url, dest=str(tmp_path)) @pytest.mark.fs def test_download_with_redirection(tmp_path, requests_mock): """Download with redirection should use the targeted URL to extract filename""" url = "https://example.org/project/requests/download" redirection_url = f"https://example.org/project/requests/files/{_filename}" requests_mock.get(url, status_code=302, headers={"location": redirection_url}) requests_mock.get( redirection_url, text=_data, headers={"content-length": str(len(_data))} ) _check_download_ok(url, dest=str(tmp_path)) def test_download_extracting_filename_from_url(tmp_path, requests_mock): """Extracting filename from url must sanitize the filename first""" url = "https://example.org/project/requests-0.0.1.tar.gz?a=b&c=d&foo=bar" requests_mock.get( url, status_code=200, text=_data, headers={"content-length": str(len(_data))} ) _check_download_ok(url, dest=str(tmp_path)) @pytest.mark.fs @pytest.mark.parametrize( "filename", [f'"{_filename}"', _filename, '"filename with spaces.tar.gz"'] ) def test_download_filename_from_content_disposition(tmp_path, requests_mock, filename): """Filename should be extracted from content-disposition request header when available.""" url = "https://example.org/download/requests/tar.gz/v0.0.1" requests_mock.get( url, text=_data, headers={ "content-length": str(len(_data)), "content-disposition": f"attachment; filename={filename}", }, ) _check_download_ok(url, dest=str(tmp_path), filename=filename.strip('"')) @pytest.mark.fs @pytest.mark.parametrize("filename", ['"archive école.tar.gz"', "archive_école.tgz"]) def test_download_utf8_filename_from_content_disposition( tmp_path, requests_mock, filename ): """Filename should be extracted from content-disposition request header when available.""" url = "https://example.org/download/requests/tar.gz/v0.0.1" data = "this is something" requests_mock.get( url, text=data, headers={ "content-length": str(len(data)), "content-disposition": f"attachment; filename*=utf-8''{quote(filename)}", }, ) _check_download_ok(url, dest=str(tmp_path), filename=filename.strip('"')) def test_api_info_failure(requests_mock): """Failure to fetch info/release information should raise""" url = "https://pypi.org/pypi/requests/json" status_code = 400 requests_mock.get(url, status_code=status_code) with pytest.raises( HTTPError, match=f"{status_code} Client Error: None for url: {url}" ): - api_info(url) + get_url_body(url) def test_api_info(requests_mock): """Fetching json info from pypi project should be ok""" url = "https://pypi.org/pypi/requests/json" requests_mock.get(url, text='{"version": "0.0.1"}') - actual_info = json.loads(api_info(url)) + actual_info = json.loads(get_url_body(url)) assert actual_info == { "version": "0.0.1", } def test_release_name(): for version, filename, expected_release in [ ("0.0.1", None, "releases/0.0.1"), ("0.0.2", "something", "releases/0.0.2/something"), ]: assert release_name(version, filename) == expected_release @pytest.fixture(autouse=True) def mock_download_retry_sleep(mocker): mocker.patch.object(download.retry, "sleep") def test_download_retry(mocker, requests_mock, tmp_path): url = f"https://example.org/project/requests/files/{_filename}" requests_mock.get( url, [ {"status_code": 429}, {"status_code": 429}, { "text": _data, "headers": {"content-length": str(len(_data))}, "status_code": 200, }, ], ) _check_download_ok(url, dest=str(tmp_path)) def test_download_retry_reraise(mocker, requests_mock, tmp_path): url = f"https://example.org/project/requests/files/{_filename}" requests_mock.get( url, [{"status_code": 429}] * 5, ) with pytest.raises(HTTPError): _check_download_ok(url, dest=str(tmp_path)) @pytest.fixture(autouse=True) def mock_api_info_retry_sleep(mocker): - mocker.patch.object(api_info.retry, "sleep") + mocker.patch.object(get_url_body.retry, "sleep") def test_api_info_retry(mocker, requests_mock, tmp_path): url = "https://example.org/api/endpoint" json_data = {"foo": "bar"} requests_mock.get( url, [ {"status_code": 429}, {"status_code": 429}, { "json": json_data, "status_code": 200, }, ], ) - assert json.loads(api_info(url)) == json_data + assert json.loads(get_url_body(url)) == json_data def test_api_info_retry_reraise(mocker, requests_mock, tmp_path): url = "https://example.org/api/endpoint" requests_mock.get( url, [{"status_code": 429}] * 5, ) with pytest.raises(HTTPError, match=f"429 Client Error: None for url: {url}"): - api_info(url) + get_url_body(url) diff --git a/swh/loader/package/utils.py b/swh/loader/package/utils.py index fbe6515..adf882b 100644 --- a/swh/loader/package/utils.py +++ b/swh/loader/package/utils.py @@ -1,213 +1,213 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import functools import itertools import logging import os import re from typing import Callable, Dict, Optional, Tuple, TypeVar from urllib.parse import unquote, urlsplit from urllib.request import urlopen import requests from requests.exceptions import HTTPError from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.stop import stop_after_attempt from tenacity.wait import wait_exponential from swh.loader.exception import NotFound from swh.loader.package import DEFAULT_PARAMS from swh.model.hashutil import HASH_BLOCK_SIZE, MultiHash from swh.model.model import Person logger = logging.getLogger(__name__) DOWNLOAD_HASHES = set(["sha1", "sha256", "length"]) EMPTY_AUTHOR = Person.from_fullname(b"") def _content_disposition_filename(header: str) -> Optional[str]: fname = None fnames = re.findall(r"filename[\*]?=([^;]+)", header) if fnames and "utf-8''" in fnames[0].lower(): # RFC 5987 fname = re.sub("utf-8''", "", fnames[0], flags=re.IGNORECASE) fname = unquote(fname) elif fnames: fname = fnames[0] if fname: fname = os.path.basename(fname.strip().strip('"')) return fname def _retry_if_throttling(retry_state) -> bool: """Custom tenacity retry predicate for handling HTTP responses with status code 429 (too many requests). """ attempt = retry_state.outcome if attempt.failed: exception = attempt.exception() return ( isinstance(exception, HTTPError) and exception.response.status_code == 429 ) return False throttling_retry = retry( retry=_retry_if_throttling, wait=wait_exponential(exp_base=10), stop=stop_after_attempt(max_attempt_number=5), before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True, ) @throttling_retry def download( url: str, dest: str, hashes: Dict = {}, filename: Optional[str] = None, auth: Optional[Tuple[str, str]] = None, extra_request_headers: Optional[Dict[str, str]] = None, ) -> Tuple[str, Dict]: """Download a remote tarball from url, uncompresses and computes swh hashes on it. Args: url: Artifact uri to fetch, uncompress and hash dest: Directory to write the archive to hashes: Dict of expected hashes (key is the hash algo) for the artifact to download (those hashes are expected to be hex string) auth: Optional tuple of login/password (for http authentication service, e.g. deposit) Raises: ValueError in case of any error when fetching/computing (length, checksums mismatched...) Returns: Tuple of local (filepath, hashes of filepath) """ params = copy.deepcopy(DEFAULT_PARAMS) if auth is not None: params["auth"] = auth if extra_request_headers is not None: params["headers"].update(extra_request_headers) # so the connection does not hang indefinitely (read/connection timeout) timeout = params.get("timeout", 60) if url.startswith("ftp://"): response = urlopen(url, timeout=timeout) chunks = (response.read(HASH_BLOCK_SIZE) for _ in itertools.count()) response_data = itertools.takewhile(bool, chunks) else: response = requests.get(url, **params, timeout=timeout, stream=True) response.raise_for_status() # update URL to response one as requests follow redirection by default # on GET requests url = response.url # try to extract filename from content-disposition header if available if filename is None and "content-disposition" in response.headers: filename = _content_disposition_filename( response.headers["content-disposition"] ) response_data = response.iter_content(chunk_size=HASH_BLOCK_SIZE) filename = filename if filename else os.path.basename(urlsplit(url).path) logger.debug("filename: %s", filename) filepath = os.path.join(dest, filename) logger.debug("filepath: %s", filepath) h = MultiHash(hash_names=DOWNLOAD_HASHES | set(hashes.keys())) with open(filepath, "wb") as f: for chunk in response_data: h.update(chunk) f.write(chunk) response.close() # Also check the expected hashes if provided if hashes: actual_hashes = h.hexdigest() for algo_hash in hashes.keys(): actual_digest = actual_hashes[algo_hash] expected_digest = hashes[algo_hash] if actual_digest != expected_digest: raise ValueError( "Failure when fetching %s. " "Checksum mismatched: %s != %s" % (url, expected_digest, actual_digest) ) computed_hashes = h.hexdigest() length = computed_hashes.pop("length") extrinsic_metadata = { "length": length, "filename": filename, "checksums": computed_hashes, "url": url, } logger.debug("extrinsic_metadata", extrinsic_metadata) return filepath, extrinsic_metadata @throttling_retry -def api_info(url: str, **extra_params) -> bytes: - """Basic api client to retrieve information, typically JSON metadata, - on software package. +def get_url_body(url: str, **extra_params) -> bytes: + """Basic HTTP client to retrieve information on software package, + typically JSON metadata from a REST API. Args: - url (str): The api url (e.g PyPI, npm, etc...) + url (str): An HTTP URL Raises: NotFound in case of query failures (for some reasons: 404, ...) Returns: The associated response's information """ logger.debug("Fetching %s", url) response = requests.get(url, **{**DEFAULT_PARAMS, **extra_params}) if response.status_code == 404: raise NotFound(f"Fail to query '{url}'. Reason: {response.status_code}") response.raise_for_status() return response.content def release_name(version: str, filename: Optional[str] = None) -> str: if filename: return "releases/%s/%s" % (version, filename) return "releases/%s" % version TReturn = TypeVar("TReturn") TSelf = TypeVar("TSelf") _UNDEFINED = object() def cached_method(f: Callable[[TSelf], TReturn]) -> Callable[[TSelf], TReturn]: cache_name = f"_cached_{f.__name__}" @functools.wraps(f) def newf(self): value = getattr(self, cache_name, _UNDEFINED) if value is _UNDEFINED: value = f(self) setattr(self, cache_name, value) return value return newf