Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/nixguix/loader.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import attr | |||||
import copy | |||||
import json | import json | ||||
import logging | import logging | ||||
from typing import Any, Dict, Iterator, Mapping, Optional, Tuple | import re | ||||
from typing import Any, Dict, List, Iterator, Mapping, Optional, Tuple | |||||
import attr | |||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
TargetType, | TargetType, | ||||
▲ Show 20 Lines • Show All 187 Lines • ▼ Show 20 Lines | |||||
def retrieve_sources(url: str) -> bytes: | def retrieve_sources(url: str) -> bytes: | ||||
return api_info(url, allow_redirects=True) | return api_info(url, allow_redirects=True) | ||||
def parse_sources(raw_sources: bytes) -> Dict[str, Any]: | def parse_sources(raw_sources: bytes) -> Dict[str, Any]: | ||||
return json.loads(raw_sources.decode("utf-8")) | return json.loads(raw_sources.decode("utf-8")) | ||||
# Known unsupported archive so far | |||||
vlorentz: forgot `tbz2` | |||||
Not Done Inline Actionsand .7z and .Z, etc. Look at swh-lister/swh/lister/gnu/tree.py vlorentz: and `.7z` and `.Z`, etc. Look at swh-lister/swh/lister/gnu/tree.py | |||||
PATTERN_KNOWN_UNSUPPORTED_ARCHIVE = re.compile( | |||||
Not Done Inline ActionsCould you just add a \. before the list of extensions? vlorentz: Could you just add a `\.` before the list of extensions? | |||||
Done Inline Actionsjust before the parenthesis as in: r".*\.(iso|whl|gem|pom|msi|pod|png|rock|ttf|jar|c|rpm|diff|patch)$", re. right? ardumont: just before the parenthesis as in:
```
r".*\.(iso|whl|gem|pom|msi|pod|png|rock|ttf|jar|c|rpm|d… | |||||
Not Done Inline Actionsyes vlorentz: yes | |||||
Done Inline Actionson its way ;) ardumont: on its way ;) | |||||
r".*\.(iso|whl|gem|pom|msi|pod|png|rock|ttf|jar|c|rpm|diff|patch)$", re.DOTALL | |||||
ardumontAuthorUnsubmitted Done Inline ActionsWe can add 'el' extensions (emacs-lisp) to the list. ardumont: We can add 'el' extensions (emacs-lisp) to the list.
Well, i gather all kind of textual files… | |||||
) | |||||
def clean_sources(sources: Dict[str, Any]) -> Dict[str, Any]: | def clean_sources(sources: Dict[str, Any]) -> Dict[str, Any]: | ||||
"""Validate and clean the sources structure. First, ensure all top level keys are | """Validate and clean the sources structure. First, ensure all top level keys are | ||||
present. Then, walk the sources list and remove sources that do not contain required | present. Then, walk the sources list and remove sources that do not contain required | ||||
keys. | keys. | ||||
Filter out source entries whose: | Filter out source entries whose: | ||||
- required keys are missing | - required keys are missing | ||||
- source type is not supported | - source type is not supported | ||||
Show All 31 Lines | def clean_sources(sources: Dict[str, Any]) -> Dict[str, Any]: | ||||
# skipped but others could still be archived. | # skipped but others could still be archived. | ||||
verified_sources = [] | verified_sources = [] | ||||
for source in sources["sources"]: | for source in sources["sources"]: | ||||
valid = True | valid = True | ||||
required_keys = ["urls", "integrity", "type"] | required_keys = ["urls", "integrity", "type"] | ||||
for required_key in required_keys: | for required_key in required_keys: | ||||
if required_key not in source: | if required_key not in source: | ||||
logger.info( | logger.info( | ||||
"Skip source '%s' because key '%s' is missing", source, required_key | f"Skip source '{source}' because key '{required_key}' is missing", | ||||
) | ) | ||||
valid = False | valid = False | ||||
if valid and source["type"] != "url": | if valid and source["type"] != "url": | ||||
logger.info( | logger.info( | ||||
"Skip source '%s' because the type %s is not supported", | f"Skip source '{source}' because the type {source['type']} " | ||||
source, | "is not supported", | ||||
source["type"], | |||||
) | ) | ||||
valid = False | valid = False | ||||
if valid and not isinstance(source["urls"], list): | if valid and not isinstance(source["urls"], list): | ||||
logger.info( | logger.info( | ||||
"Skip source '%s' because the urls attribute is not a list", source | f"Skip source {source} because the urls attribute is not a list" | ||||
) | ) | ||||
valid = False | valid = False | ||||
if valid: | |||||
verified_sources.append(source) | if valid and len(source["urls"]) > 0: # Filter out unsupported archives | ||||
supported_sources: List[str] = [] | |||||
for source_url in source["urls"]: | |||||
if PATTERN_KNOWN_UNSUPPORTED_ARCHIVE.match(source_url): | |||||
logger.info(f"Skip unsupported artifact url {source_url}") | |||||
continue | |||||
supported_sources.append(source_url) | |||||
if len(supported_sources) == 0: | |||||
logger.info( | |||||
Not Done Inline Actionslet's be consistent with string prefixes. and spaces belong at the end of lines, not the beginning vlorentz: let's be consistent with string prefixes.
and spaces belong at the end of lines, not the… | |||||
Done Inline Actionsyeah, i have a tendency to split "a l'arrache", sorry and forget to check back... ardumont: yeah, i have a tendency to split "a l'arrache", sorry and forget to check back... | |||||
f"Skip source {source} because urls only reference " | |||||
"unsupported artifacts. Unsupported " | |||||
f"artifacts so far: {PATTERN_KNOWN_UNSUPPORTED_ARCHIVE}" | |||||
) | |||||
continue | |||||
new_source = copy.deepcopy(source) | |||||
new_source["urls"] = supported_sources | |||||
verified_sources.append(new_source) | |||||
sources["sources"] = verified_sources | sources["sources"] = verified_sources | ||||
return sources | return sources |
forgot tbz2