Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/nixguix/loader.py
| Show First 20 Lines • Show All 57 Lines • ▼ Show 20 Lines | class NixGuixLoader(PackageLoader[NixGuixPackageInfo]): | ||||
| sources used by functional package manager (eg. Nix and Guix). | sources used by functional package manager (eg. Nix and Guix). | ||||
| """ | """ | ||||
| visit_type = "nixguix" | visit_type = "nixguix" | ||||
| def __init__(self, url): | def __init__(self, url): | ||||
| super().__init__(url=url) | super().__init__(url=url) | ||||
| unsupported_file_extensions = self.config.get("unsupported_file_extensions", []) | |||||
vlorentz: s/archive/file/
because they are also extensions of non-archives | |||||
Done Inline Actionsoh right! ardumont: oh right! | |||||
| self.raw_sources = retrieve_sources(url) | self.raw_sources = retrieve_sources(url) | ||||
| clean = clean_sources(parse_sources(self.raw_sources)) | clean = clean_sources( | ||||
| parse_sources(self.raw_sources), unsupported_file_extensions | |||||
| ) | |||||
| self.sources = clean["sources"] | self.sources = clean["sources"] | ||||
| self.provider_url = url | self.provider_url = url | ||||
| self._integrityByUrl = {s["urls"][0]: s["integrity"] for s in self.sources} | self._integrityByUrl = {s["urls"][0]: s["integrity"] for s in self.sources} | ||||
| # The revision used to create the sources.json file. For Nix, | # The revision used to create the sources.json file. For Nix, | ||||
| # this revision belongs to the github.com/nixos/nixpkgs | # this revision belongs to the github.com/nixos/nixpkgs | ||||
| # repository | # repository | ||||
| ▲ Show 20 Lines • Show All 133 Lines • ▼ Show 20 Lines | |||||
| def retrieve_sources(url: str) -> bytes: | def retrieve_sources(url: str) -> bytes: | ||||
| return api_info(url, allow_redirects=True) | return api_info(url, allow_redirects=True) | ||||
| def parse_sources(raw_sources: bytes) -> Dict[str, Any]: | def parse_sources(raw_sources: bytes) -> Dict[str, Any]: | ||||
| return json.loads(raw_sources.decode("utf-8")) | return json.loads(raw_sources.decode("utf-8")) | ||||
| # Known unsupported archive so far | def make_pattern_unsupported_file_extension(unsupported_file_extensions: List[str],): | ||||
| PATTERN_KNOWN_UNSUPPORTED_ARCHIVE = re.compile( | """Make a regexp pattern for unsupported file extension out of a list | ||||
| r".*\.(iso|whl|gem|pom|msi|pod|png|rock|ttf|jar|c|rpm|diff|patch)$", re.DOTALL | of unsupported archive extension list. | ||||
| """ | |||||
| return re.compile( | |||||
| rf".*\.({'|'.join(map(re.escape, unsupported_file_extensions))})$", re.DOTALL | |||||
Not Done Inline Actionsre.escape(unsupported_archive_extensions), just in case vlorentz: `re.escape(unsupported_archive_extensions)`, just in case | |||||
| ) | ) | ||||
| def clean_sources(sources: Dict[str, Any]) -> Dict[str, Any]: | def clean_sources( | ||||
| sources: Dict[str, Any], unsupported_file_extensions=[] | |||||
| ) -> Dict[str, Any]: | |||||
| """Validate and clean the sources structure. First, ensure all top level keys are | """Validate and clean the sources structure. First, ensure all top level keys are | ||||
| present. Then, walk the sources list and remove sources that do not contain required | present. Then, walk the sources list and remove sources that do not contain required | ||||
| keys. | keys. | ||||
| Filter out source entries whose: | Filter out source entries whose: | ||||
| - required keys are missing | - required keys are missing | ||||
| - source type is not supported | - source type is not supported | ||||
| - urls attribute type is not a list | - urls attribute type is not a list | ||||
| - extension is known not to be supported by the loader | |||||
| Raises: | Raises: | ||||
| ValueError if: | ValueError if: | ||||
| - a required top level key is missing | - a required top level key is missing | ||||
| - top-level version is not 1 | - top-level version is not 1 | ||||
| Returns: | Returns: | ||||
| Dict sources | source Dict cleaned up | ||||
| """ | """ | ||||
| pattern_unsupported_file = make_pattern_unsupported_file_extension( | |||||
| unsupported_file_extensions | |||||
| ) | |||||
| # Required top level keys | # Required top level keys | ||||
| required_keys = ["version", "revision", "sources"] | required_keys = ["version", "revision", "sources"] | ||||
| missing_keys = [] | missing_keys = [] | ||||
| for required_key in required_keys: | for required_key in required_keys: | ||||
| if required_key not in sources: | if required_key not in sources: | ||||
| missing_keys.append(required_key) | missing_keys.append(required_key) | ||||
| if missing_keys != []: | if missing_keys != []: | ||||
| Show All 32 Lines | for source in sources["sources"]: | ||||
| logger.info( | logger.info( | ||||
| f"Skip source {source} because the urls attribute is not a list" | f"Skip source {source} because the urls attribute is not a list" | ||||
| ) | ) | ||||
| valid = False | valid = False | ||||
| if valid and len(source["urls"]) > 0: # Filter out unsupported archives | if valid and len(source["urls"]) > 0: # Filter out unsupported archives | ||||
| supported_sources: List[str] = [] | supported_sources: List[str] = [] | ||||
| for source_url in source["urls"]: | for source_url in source["urls"]: | ||||
| if PATTERN_KNOWN_UNSUPPORTED_ARCHIVE.match(source_url): | if pattern_unsupported_file.match(source_url): | ||||
| logger.info(f"Skip unsupported artifact url {source_url}") | logger.info(f"Skip unsupported artifact url {source_url}") | ||||
| continue | continue | ||||
| supported_sources.append(source_url) | supported_sources.append(source_url) | ||||
| if len(supported_sources) == 0: | if len(supported_sources) == 0: | ||||
| logger.info( | logger.info( | ||||
| f"Skip source {source} because urls only reference " | f"Skip source {source} because urls only reference " | ||||
| "unsupported artifacts. Unsupported " | "unsupported artifacts. Unsupported " | ||||
| f"artifacts so far: {PATTERN_KNOWN_UNSUPPORTED_ARCHIVE}" | f"artifacts so far: {pattern_unsupported_file}" | ||||
| ) | ) | ||||
| continue | continue | ||||
| new_source = copy.deepcopy(source) | new_source = copy.deepcopy(source) | ||||
| new_source["urls"] = supported_sources | new_source["urls"] = supported_sources | ||||
| verified_sources.append(new_source) | verified_sources.append(new_source) | ||||
| sources["sources"] = verified_sources | sources["sources"] = verified_sources | ||||
| return sources | return sources | ||||
s/archive/file/
because they are also extensions of non-archives