diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py --- a/swh/indexer/metadata_dictionary/__init__.py +++ b/swh/indexer/metadata_dictionary/__init__.py @@ -1,10 +1,10 @@ import collections -from types import ModuleType -from typing import DefaultDict, Set +from typing import DefaultDict, Set, Type import click from . import codemeta, maven, npm, python, ruby +from .base import BaseMapping MAPPINGS = { "CodemetaMapping": codemeta.CodemetaMapping, @@ -15,13 +15,13 @@ } -def list_terms() -> DefaultDict[str, Set[ModuleType]]: +def list_terms() -> DefaultDict[str, Set[Type[BaseMapping]]]: """Returns a dictionary with all supported CodeMeta terms as keys, and the mappings that support each of them as values.""" - d: DefaultDict[str, Set[ModuleType]] = collections.defaultdict(set) + d: DefaultDict[str, Set[Type[BaseMapping]]] = collections.defaultdict(set) for mapping in MAPPINGS.values(): - for term in mapping.supported_terms(): # type: ignore - d[term].add(mapping) # type: ignore + for term in mapping.supported_terms(): # type:ignore [attr-defined] + d[term].add(mapping) return d diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -5,11 +5,26 @@ import json import logging -from typing import Any, Dict, List, Set +from typing import Any, Dict, List, Optional, Set + +from typing_extensions import TypedDict from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values +class File_entries(TypedDict): + name: bytes + type: str + dir_id: bytes + sha1_git: Optional[bytes] + target: Optional[bytes] + length: Optional[int] + status: Optional[str] + perms: Optional[int] + sha1: bytes + sha256: Optional[bytes] + + class BaseMapping: """Base class for mappings to inherit from @@ -32,7 +47,7 @@ raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod - def detect_metadata_files(cls, files: List) -> List[str]: + def detect_metadata_files(cls, files: List[File_entries]) -> List[bytes]: """ Detects files potentially containing metadata @@ -44,7 +59,7 @@ """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") - def translate(self, file_content: bytes) -> Any: + def translate(self, file_content: bytes) -> Optional[Dict[str, Any]]: raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: @@ -60,10 +75,11 @@ raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod - def detect_metadata_files(cls, file_entries: List[Dict[str, Any]]) -> List[str]: + def detect_metadata_files(cls, file_entries: List[File_entries]) -> List[bytes]: for entry in file_entries: - if entry["name"].lower() == cls.filename.lower(): # type: ignore - return [entry["sha1"]] + if isinstance(entry["name"], bytes) and isinstance(cls.filename, bytes): + if entry["name"].lower() == cls.filename.lower(): + return [entry["sha1"]] return [] @@ -81,20 +97,24 @@ raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod - def _normalize_method_name(name: str): + def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls) -> Set[str]: - return { - term - for (key, term) in cls.mapping.items() # type: ignore - if key in cls.string_fields - or hasattr(cls, "translate_" + cls._normalize_method_name(key)) - or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) - } - - def _translate_dict(self, content_dict: Dict, *, normalize=True) -> Dict: + if isinstance(cls.mapping, Dict): + return { + term + for (key, term) in cls.mapping.items() + if key in cls.string_fields + or hasattr(cls, "translate_" + cls._normalize_method_name(key)) + or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) + } + return set() + + def _translate_dict( + self, content_dict: Dict[str, Any], *, normalize=True + ) -> Dict[str, Any]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping @@ -150,7 +170,7 @@ class JsonMapping(DictMapping, SingleFileMapping): """Base class for all mappings that use a JSON file as input.""" - def translate(self, raw_content: bytes) -> Any: + def translate(self, raw_content_bytes: bytes) -> Optional[Dict[str, Any]]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping @@ -164,14 +184,16 @@ """ try: - raw_content = raw_content.decode() # type: ignore + raw_content = raw_content_bytes.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) - return + return None try: content_dict = json.loads(raw_content) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) - return + return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) + else: + return None diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py --- a/swh/indexer/metadata_dictionary/codemeta.py +++ b/swh/indexer/metadata_dictionary/codemeta.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import json -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from swh.indexer.codemeta import CODEMETA_TERMS, expand @@ -24,7 +24,11 @@ def supported_terms(cls) -> List[str]: return [term for term in CODEMETA_TERMS if not term.startswith("@")] - def translate(self, content: bytes) -> Optional[Dict[str, Any]]: + def translate( + self, content: bytes + ) -> Optional[ + Dict[str, Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]]] + ]: try: return self.normalize_translation(expand(json.loads(content.decode()))) except Exception: diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import os -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple, Union import xml.parsers.expat import xmltodict @@ -24,7 +24,9 @@ mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] - def translate(self, content: bytes) -> Optional[Dict[str, Any]]: + def translate( + self, content: bytes + ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]: try: d = xmltodict.parse(content).get("project") or {} except xml.parsers.expat.ExpatError: @@ -44,7 +46,7 @@ _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} - def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Any]]: + def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Dict[str, str]]]: """https://maven.apache.org/pom.html#Repositories >>> import xmltodict @@ -73,9 +75,11 @@ results = [] return [res for res in results if res] or None - def parse_repository(self, d: Dict[str, Any], repo: Dict[str, Any]) -> Any: + def parse_repository( + self, d: Dict[str, Any], repo: Dict[str, Any] + ) -> Optional[Dict[str, str]]: if not isinstance(repo, dict): - return + return None if repo.get("layout", "default") != "default": return None # TODO ? url = repo.get("url") @@ -87,6 +91,8 @@ and isinstance(artifact_id, str) ): return {"@id": os.path.join(url, *group_id.split("."), artifact_id)} + else: + return None def normalize_groupId(self, id_: str) -> Dict[str, str]: """https://maven.apache.org/pom.html#Maven_Coordinates diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -31,7 +31,7 @@ } def normalize_repository( - self, d: Union[Dict, str, Any] + self, d: Union[Dict[str, Any], str] ) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#repository @@ -70,7 +70,7 @@ return {"@id": url} - def normalize_bugs(self, d: Union[Dict, str, Any]) -> Optional[Dict[str, str]]: + def normalize_bugs(self, d: Union[Dict, str]) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ @@ -94,8 +94,8 @@ ) def normalize_author( - self, d: Union[Dict, str, Any] - ) -> Optional[Dict[str, List[Dict[str, Any]]]]: + self, d: Union[Dict, str] + ) -> Optional[Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]: """https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint @@ -116,7 +116,7 @@ 'http://schema.org/name': 'John Doe', 'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]} """ # noqa - author = {"@type": SCHEMA_URI + "Person"} + author: Dict[str, Union[str, Dict[str, str]]] = {"@type": SCHEMA_URI + "Person"} if isinstance(d, dict): name = d.get("name", None) email = d.get("email", None) @@ -135,10 +135,10 @@ if email and isinstance(email, str): author[SCHEMA_URI + "email"] = email if url and isinstance(url, str): - author[SCHEMA_URI + "url"] = {"@id": url} # type: ignore + author[SCHEMA_URI + "url"] = {"@id": url} return {"@list": [author]} - def normalize_license(self, s: str) -> Any: + def normalize_license(self, s: str) -> Dict[str, str]: """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') @@ -147,7 +147,7 @@ if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} - def normalize_homepage(self, s: str) -> Any: + def normalize_homepage(self, s: str) -> Dict[str, str]: """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') @@ -156,7 +156,7 @@ if isinstance(s, str): return {"@id": s} - def normalize_keywords(self, lst: List[str]) -> Any: + def normalize_keywords(self, lst: List[str]) -> Optional[List[str]]: """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) @@ -164,3 +164,5 @@ """ if isinstance(lst, list): return [x for x in lst if isinstance(x, str)] + else: + return None diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py --- a/swh/indexer/metadata_dictionary/python.py +++ b/swh/indexer/metadata_dictionary/python.py @@ -6,7 +6,7 @@ import email.parser import email.policy import itertools -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple, Union from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI @@ -45,7 +45,9 @@ _parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) - def translate(self, content: bytes) -> Dict[str, Any]: + def translate( + self, content: bytes + ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]: msg = self._parser.parsebytes(content) d: Dict[str, List[str]] = {} for (key, value) in msg.items(): diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -6,11 +6,11 @@ import ast import itertools import re -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple, Union from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import DictMapping +from .base import DictMapping, File_entries def name_to_person(name: str) -> Dict[str, str]: @@ -29,15 +29,20 @@ _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod - def detect_metadata_files(cls: Any, file_entries: Any) -> List[str]: + def detect_metadata_files( + cls: Any, file_entries: List[File_entries] + ) -> List[bytes]: for entry in file_entries: - if entry["name"].endswith(b".gemspec"): - return [entry["sha1"]] + if isinstance(entry["name"], bytes): + if entry["name"].endswith(b".gemspec"): + return [entry["sha1"]] return [] - def translate(self, raw_content: Any) -> Optional[Dict[str, str]]: + def translate( + self, raw_content_bytes: bytes + ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]: try: - raw_content = raw_content.decode() + raw_content = raw_content_bytes.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None @@ -53,7 +58,7 @@ self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) return None - content_dict = {} + content_dict: Dict[str, Union[str, List[str]]] = {} for line in lines: match = self._re_spec_entry.match(line) if match: @@ -62,7 +67,7 @@ content_dict[match.group("key")] = value return self._translate_dict(content_dict) - def eval_ruby_expression(self, expr: str) -> Any: + def eval_ruby_expression(self, expr: str) -> Optional[Union[str, List[str]]]: """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') @@ -101,6 +106,8 @@ return None if isinstance(tree, ast.Expression): return evaluator(tree.body) + else: + return None def normalize_homepage(self, s: str) -> Dict[str, str]: if isinstance(s, str): @@ -110,19 +117,22 @@ if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] - def normalize_licenses(self, licenses: List[str]) -> Any: + def normalize_licenses(self, licenses: List[str]) -> List[Dict[str, str]]: if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} for license in licenses if isinstance(license, str) ] + return [] - def normalize_author(self, author: str) -> Any: + def normalize_author(self, author: str) -> Dict[str, List[Dict[str, str]]]: if isinstance(author, str): return {"@list": [name_to_person(author)]} - def normalize_authors(self, authors: List[str]) -> Any: + def normalize_authors( + self, authors: List[str] + ) -> Optional[Dict[str, List[Dict[str, str]]]]: if isinstance(authors, list): return { "@list": [ @@ -131,3 +141,5 @@ if isinstance(author, str) ] } + else: + return None