diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py --- a/swh/indexer/metadata_dictionary/__init__.py +++ b/swh/indexer/metadata_dictionary/__init__.py @@ -1,4 +1,6 @@ import collections +from types import ModuleType +from typing import DefaultDict, Set import click @@ -13,20 +15,20 @@ } -def list_terms(): +def list_terms() -> DefaultDict[str, Set[ModuleType]]: """Returns a dictionary with all supported CodeMeta terms as keys, and the mappings that support each of them as values.""" - d = collections.defaultdict(set) + d: DefaultDict[str, Set[ModuleType]] = collections.defaultdict(set) for mapping in MAPPINGS.values(): - for term in mapping.supported_terms(): - d[term].add(mapping) + for term in mapping.supported_terms(): # type: ignore + d[term].add(mapping) # type: ignore return d @click.command() @click.argument("mapping_name") @click.argument("file_name") -def main(mapping_name, file_name): +def main(mapping_name: str, file_name: str) -> None: from pprint import pprint with open(file_name, "rb") as fd: diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -5,7 +5,7 @@ import json import logging -from typing import List +from typing import Any, Dict, List, Set from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values @@ -19,7 +19,7 @@ - override translate function """ - def __init__(self, log_suffix=""): + def __init__(self, log_suffix: str = ""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) @@ -32,7 +32,7 @@ raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod - def detect_metadata_files(cls, files): + def detect_metadata_files(cls, files: List) -> List[str]: """ Detects files potentially containing metadata @@ -44,10 +44,10 @@ """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") - def translate(self, file_content): + def translate(self, file_content: bytes) -> Any: raise NotImplementedError(f"{self.__class__.__name__}.translate") - def normalize_translation(self, metadata): + def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata) @@ -60,9 +60,9 @@ raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod - def detect_metadata_files(cls, file_entries): + def detect_metadata_files(cls, file_entries: List[Dict[str, Any]]) -> List[str]: for entry in file_entries: - if entry["name"].lower() == cls.filename.lower(): + if entry["name"].lower() == cls.filename.lower(): # type: ignore return [entry["sha1"]] return [] @@ -71,7 +71,7 @@ """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" - string_fields = [] # type: List[str] + string_fields: List[str] = [] """List of fields that are simple strings, and don't need any normalization.""" @@ -81,20 +81,20 @@ raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod - def _normalize_method_name(name): + def _normalize_method_name(name: str): return name.replace("-", "_") @classmethod - def supported_terms(cls): + def supported_terms(cls) -> Set[str]: return { term - for (key, term) in cls.mapping.items() + for (key, term) in cls.mapping.items() # type: ignore if key in cls.string_fields or hasattr(cls, "translate_" + cls._normalize_method_name(key)) or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } - def _translate_dict(self, content_dict, *, normalize=True): + def _translate_dict(self, content_dict: Dict, *, normalize=True) -> Dict: """ Translates content by parsing content from a dict object and translating with the appropriate mapping @@ -150,7 +150,7 @@ class JsonMapping(DictMapping, SingleFileMapping): """Base class for all mappings that use a JSON file as input.""" - def translate(self, raw_content): + def translate(self, raw_content: bytes) -> Any: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping @@ -164,7 +164,7 @@ """ try: - raw_content = raw_content.decode() + raw_content = raw_content.decode() # type: ignore except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py --- a/swh/indexer/metadata_dictionary/codemeta.py +++ b/swh/indexer/metadata_dictionary/codemeta.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import json +from typing import Any, Dict, List, Optional from swh.indexer.codemeta import CODEMETA_TERMS, expand @@ -20,10 +21,10 @@ string_fields = None @classmethod - def supported_terms(cls): + def supported_terms(cls) -> List[str]: return [term for term in CODEMETA_TERMS if not term.startswith("@")] - def translate(self, content): + def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: return self.normalize_translation(expand(json.loads(content.decode()))) except Exception: diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import os +from typing import Any, Dict, List, Optional import xml.parsers.expat import xmltodict @@ -23,7 +24,7 @@ mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] - def translate(self, content): + def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: d = xmltodict.parse(content).get("project") or {} except xml.parsers.expat.ExpatError: @@ -43,7 +44,7 @@ _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} - def parse_repositories(self, d): + def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Any]]: """https://maven.apache.org/pom.html#Repositories >>> import xmltodict @@ -72,11 +73,11 @@ results = [] return [res for res in results if res] or None - def parse_repository(self, d, repo): + def parse_repository(self, d: Dict[str, Any], repo: Dict[str, Any]) -> Any: if not isinstance(repo, dict): return if repo.get("layout", "default") != "default": - return # TODO ? + return None # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") @@ -85,10 +86,9 @@ and isinstance(group_id, str) and isinstance(artifact_id, str) ): - repo = os.path.join(url, *group_id.split("."), artifact_id) - return {"@id": repo} + return {"@id": os.path.join(url, *group_id.split("."), artifact_id)} - def normalize_groupId(self, id_): + def normalize_groupId(self, id_: str) -> Dict[str, str]: """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') @@ -97,7 +97,7 @@ if isinstance(id_, str): return {"@id": id_} - def parse_licenses(self, d): + def parse_licenses(self, d: Dict[str, Any]) -> Optional[List[Dict[str, str]]]: """https://maven.apache.org/pom.html#Licenses >>> import xmltodict @@ -145,12 +145,12 @@ licenses = d.get("licenses") if not isinstance(licenses, dict): - return + return None licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): - return + return None return [ {"@id": license["url"]} for license in licenses diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import re +from typing import Any, Dict, List, Optional, Union from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI @@ -29,7 +30,9 @@ # 'bitbucket': 'https://bitbucket.org/', } - def normalize_repository(self, d): + def normalize_repository( + self, d: Union[Dict, str, Any] + ) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#repository >>> NpmMapping().normalize_repository({ @@ -67,7 +70,7 @@ return {"@id": url} - def normalize_bugs(self, d): + def normalize_bugs(self, d: Union[Dict, str, Any]) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ @@ -90,7 +93,9 @@ r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$" ) - def normalize_author(self, d): + def normalize_author( + self, d: Union[Dict, str, Any] + ) -> Optional[Dict[str, List[Dict[str, Any]]]]: """https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint @@ -130,10 +135,10 @@ if email and isinstance(email, str): author[SCHEMA_URI + "email"] = email if url and isinstance(url, str): - author[SCHEMA_URI + "url"] = {"@id": url} + author[SCHEMA_URI + "url"] = {"@id": url} # type: ignore return {"@list": [author]} - def normalize_license(self, s): + def normalize_license(self, s: str) -> Any: """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') @@ -142,7 +147,7 @@ if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} - def normalize_homepage(self, s): + def normalize_homepage(self, s: str) -> Any: """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') @@ -151,7 +156,7 @@ if isinstance(s, str): return {"@id": s} - def normalize_keywords(self, lst): + def normalize_keywords(self, lst: List[str]) -> Any: """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py --- a/swh/indexer/metadata_dictionary/python.py +++ b/swh/indexer/metadata_dictionary/python.py @@ -6,6 +6,7 @@ import email.parser import email.policy import itertools +from typing import Any, Dict, List from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI @@ -15,11 +16,11 @@ class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): - def header_fetch_parse(self, name, value): + def header_fetch_parse(self, name: str, value: str) -> str: if hasattr(value, "name"): return value value = value.replace("\n ", "\n") - return self.header_factory(name, value) + return self.header_factory(name, value) # type: ignore class PythonPkginfoMapping(DictMapping, SingleFileMapping): @@ -44,9 +45,9 @@ _parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) - def translate(self, content): + def translate(self, content: bytes) -> Dict[str, Any]: msg = self._parser.parsebytes(content) - d = {} + d: Dict[str, List[str]] = {} for (key, value) in msg.items(): key = _normalize_pkginfo_key(key) if value != "UNKNOWN": @@ -66,11 +67,11 @@ } return self.normalize_translation(metadata) - def normalize_home_page(self, urls): + def normalize_home_page(self, urls: List[str]) -> List[Dict[str, str]]: return [{"@id": url} for url in urls] - def normalize_keywords(self, keywords): + def normalize_keywords(self, keywords: List[str]) -> List[str]: return list(itertools.chain.from_iterable(s.split(" ") for s in keywords)) - def normalize_license(self, licenses): + def normalize_license(self, licenses: List[str]) -> List[Dict[str, str]]: return [{"@id": license} for license in licenses] diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -6,13 +6,14 @@ import ast import itertools import re +from typing import Any, Dict, List, Optional from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI from .base import DictMapping -def name_to_person(name): +def name_to_person(name: str) -> Dict[str, str]: return { "@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": name, @@ -28,18 +29,18 @@ _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod - def detect_metadata_files(cls, file_entries): + def detect_metadata_files(cls: Any, file_entries: Any) -> List[str]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] - def translate(self, raw_content): + def translate(self, raw_content: Any) -> Optional[Dict[str, str]]: try: raw_content = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) - return + return None # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( @@ -50,7 +51,7 @@ next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) - return + return None content_dict = {} for line in lines: @@ -61,7 +62,7 @@ content_dict[match.group("key")] = value return self._translate_dict(content_dict) - def eval_ruby_expression(self, expr): + def eval_ruby_expression(self, expr: str) -> Any: """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') @@ -97,19 +98,19 @@ # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): - return + return None if isinstance(tree, ast.Expression): return evaluator(tree.body) - def normalize_homepage(self, s): + def normalize_homepage(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": s} - def normalize_license(self, s): + def normalize_license(self, s: str) -> List[Dict[str, str]]: if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] - def normalize_licenses(self, licenses): + def normalize_licenses(self, licenses: List[str]) -> Any: if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} @@ -117,11 +118,11 @@ if isinstance(license, str) ] - def normalize_author(self, author): + def normalize_author(self, author: str) -> Any: if isinstance(author, str): return {"@list": [name_to_person(author)]} - def normalize_authors(self, authors): + def normalize_authors(self, authors: List[str]) -> Any: if isinstance(authors, list): return { "@list": [