diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py --- a/swh/indexer/metadata_dictionary/__init__.py +++ b/swh/indexer/metadata_dictionary/__init__.py @@ -1,10 +1,13 @@ import collections +from typing import DefaultDict, Dict, Set, Type import click +from typing_extensions import Final from . import cff, codemeta, maven, npm, python, ruby +from .base import BaseMapping -MAPPINGS = { +MAPPINGS: Final[Dict[str, Type[BaseMapping]]] = { "CodemetaMapping": codemeta.CodemetaMapping, "MavenMapping": maven.MavenMapping, "NpmMapping": npm.NpmMapping, @@ -14,7 +17,7 @@ } -def list_terms(): +def list_terms() -> DefaultDict[str, Set[Type[BaseMapping]]]: """Returns a dictionary with all supported CodeMeta terms as keys, and the mappings that support each of them as values.""" d = collections.defaultdict(set) @@ -27,7 +30,7 @@ @click.command() @click.argument("mapping_name") @click.argument("file_name") -def main(mapping_name, file_name): +def main(mapping_name: str, file_name: str) -> None: from pprint import pprint with open(file_name, "rb") as fd: diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -5,11 +5,25 @@ import json import logging -from typing import List +from typing import Dict, Iterable, List, Optional + +from typing_extensions import TypedDict from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values +class FileEntry(TypedDict): + name: bytes + sha1: bytes + sha1_git: bytes + target: bytes + length: int + status: str + type: str + perms: int + dir_id: bytes + + class BaseMapping: """Base class for mappings to inherit from @@ -19,35 +33,40 @@ - override translate function """ - def __init__(self, log_suffix=""): + def __init__(self, log_suffix: str = ""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property - def name(self): + def name(self) -> str: """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod - def detect_metadata_files(cls, files): + def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]: """ Detects files potentially containing metadata Args: - file_entries (list): list of files + file_entries: list of files Returns: list: list of sha1 (possibly empty) """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") - def translate(self, file_content): + @classmethod + def supported_terms(cls) -> Iterable[str]: + """Returns all CodeMeta terms this mapping supports""" + raise NotImplementedError(f"{cls.__name__}.supported_terms") + + def translate(self, file_content: bytes) -> Optional[Dict]: raise NotImplementedError(f"{self.__class__.__name__}.translate") - def normalize_translation(self, metadata): + def normalize_translation(self, metadata: Dict) -> Dict: return compact(metadata) @@ -55,14 +74,14 @@ """Base class for all mappings that use a single file as input.""" @property - def filename(self): + def filename(self) -> bytes: """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod - def detect_metadata_files(cls, file_entries): + def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]: for entry in file_entries: - if entry["name"].lower() == cls.filename.lower(): + if entry["name"].lower() == cls.filename.lower(): # type: ignore return [entry["sha1"]] return [] @@ -71,36 +90,36 @@ """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" - string_fields = [] # type: List[str] + string_fields: List[str] = [] """List of fields that are simple strings, and don't need any normalization.""" @property - def mapping(self): + def mapping(self) -> Dict[str, str]: """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod - def _normalize_method_name(name): + def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod - def supported_terms(cls): + def supported_terms(cls) -> Iterable[str]: return { term - for (key, term) in cls.mapping.items() + for (key, term) in cls.mapping.items() # type: ignore if key in cls.string_fields or hasattr(cls, "translate_" + cls._normalize_method_name(key)) or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } - def _translate_dict(self, content_dict, *, normalize=True): + def _translate_dict(self, content_dict: Dict, *, normalize: bool = True) -> Dict: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: - content_dict (dict): content dict to translate + content_dict: content dict to translate Returns: dict: translated metadata in json-friendly form needed for @@ -150,13 +169,13 @@ class JsonMapping(DictMapping, SingleFileMapping): """Base class for all mappings that use a JSON file as input.""" - def translate(self, raw_content): + def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: - raw_content (bytes): raw content to translate + raw_content: raw content to translate Returns: dict: translated metadata in json-friendly form needed for @@ -164,14 +183,15 @@ """ try: - raw_content = raw_content.decode() + content: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) - return + return None try: - content_dict = json.loads(raw_content) + content_dict = json.loads(content) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) - return + return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) + return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,3 +1,5 @@ +from typing import Any, Dict, List, Optional + import yaml from swh.indexer.codemeta import CODEMETA_CONTEXT_URL, CROSSWALK_TABLE, SCHEMA_URI @@ -18,19 +20,19 @@ mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] - def translate(self, raw_content): - raw_content = raw_content.decode() - content_dict = yaml.load(raw_content, Loader=yaml.SafeLoader) + def translate(self, raw_content: bytes) -> Dict: + content: str = raw_content.decode() + content_dict = yaml.load(content, Loader=yaml.SafeLoader) metadata = self._translate_dict(content_dict) metadata["@context"] = CODEMETA_CONTEXT_URL return metadata - def normalize_authors(self, d): - result = [] + def normalize_authors(self, d) -> Dict[str, Any]: + result: List[Dict[str, Any]] = [] for author in d: - author_data = {"@type": SCHEMA_URI + "Person"} + author_data: Dict[str, Any] = {"@type": SCHEMA_URI + "Person"} if "orcid" in author: author_data["@id"] = author["orcid"] if "affiliation" in author: @@ -45,21 +47,24 @@ result.append(author_data) - result = {"@list": result} - return result + return {"@list": result} - def normalize_doi(self, s): + def normalize_doi(self, s) -> Optional[Dict[str, str]]: if isinstance(s, str): return {"@id": "https://doi.org/" + s} + return None - def normalize_license(self, s): + def normalize_license(self, s) -> Optional[Dict[str, str]]: if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} + return None - def normalize_repository_code(self, s): + def normalize_repository_code(self, s) -> Optional[Dict[str, str]]: if isinstance(s, str): return {"@id": s} + return None - def normalize_date_released(self, s): + def normalize_date_released(self, s) -> Optional[Dict[str, str]]: if isinstance(s, str): return {"@value": s, "@type": SCHEMA_URI + "Date"} + return None diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py --- a/swh/indexer/metadata_dictionary/codemeta.py +++ b/swh/indexer/metadata_dictionary/codemeta.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import json +from typing import Any, Dict, List, Optional from swh.indexer.codemeta import CODEMETA_TERMS, expand @@ -20,10 +21,10 @@ string_fields = None @classmethod - def supported_terms(cls): + def supported_terms(cls) -> List[str]: return [term for term in CODEMETA_TERMS if not term.startswith("@")] - def translate(self, content): + def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: return self.normalize_translation(expand(json.loads(content.decode()))) except Exception: diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import os +from typing import Any, Dict, List, Optional import xml.parsers.expat import xmltodict @@ -46,7 +47,7 @@ _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} - def parse_repositories(self, d): + def parse_repositories(self, d: Dict) -> Optional[List[Optional[Dict[str, Any]]]]: """https://maven.apache.org/pom.html#Repositories >>> import xmltodict @@ -75,11 +76,11 @@ results = [] return [res for res in results if res] or None - def parse_repository(self, d, repo): + def parse_repository(self, d: Dict, repo) -> Optional[Dict[str, Any]]: if not isinstance(repo, dict): - return + return None if repo.get("layout", "default") != "default": - return # TODO ? + return None # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") @@ -90,8 +91,9 @@ ): repo = os.path.join(url, *group_id.split("."), artifact_id) return {"@id": repo} + return None - def normalize_groupId(self, id_): + def normalize_groupId(self, id_) -> Optional[Dict[str, Any]]: """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') @@ -99,8 +101,9 @@ """ if isinstance(id_, str): return {"@id": id_} + return None - def parse_licenses(self, d): + def parse_licenses(self, d) -> Optional[List[Dict[str, Any]]]: """https://maven.apache.org/pom.html#Licenses >>> import xmltodict @@ -148,12 +151,12 @@ licenses = d.get("licenses") if not isinstance(licenses, dict): - return + return None licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): - return + return None return [ {"@id": license["url"]} for license in licenses diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import re +from typing import Any, Dict, List, Optional from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI @@ -29,7 +30,7 @@ # 'bitbucket': 'https://bitbucket.org/', } - def normalize_repository(self, d): + def normalize_repository(self, d) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#repository >>> NpmMapping().normalize_repository({ @@ -67,7 +68,7 @@ return {"@id": url} - def normalize_bugs(self, d): + def normalize_bugs(self, d) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ @@ -90,7 +91,7 @@ r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$" ) - def normalize_author(self, d): + def normalize_author(self, d) -> Optional[Dict[str, Any]]: """https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint @@ -111,7 +112,7 @@ 'http://schema.org/name': 'John Doe', 'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]} """ # noqa - author = {"@type": SCHEMA_URI + "Person"} + author: Dict[str, Any] = {"@type": SCHEMA_URI + "Person"} if isinstance(d, dict): name = d.get("name", None) email = d.get("email", None) @@ -133,7 +134,7 @@ author[SCHEMA_URI + "url"] = {"@id": url} return {"@list": [author]} - def normalize_license(self, s): + def normalize_license(self, s) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') @@ -141,8 +142,9 @@ """ if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} + return None - def normalize_homepage(self, s): + def normalize_homepage(self, s) -> Optional[Dict[str, str]]: """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') @@ -150,8 +152,9 @@ """ if isinstance(s, str): return {"@id": s} + return None - def normalize_keywords(self, lst): + def normalize_keywords(self, lst: List) -> Optional[List[str]]: """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) @@ -159,3 +162,4 @@ """ if isinstance(lst, list): return [x for x in lst if isinstance(x, str)] + return None diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py --- a/swh/indexer/metadata_dictionary/python.py +++ b/swh/indexer/metadata_dictionary/python.py @@ -6,6 +6,7 @@ import email.parser import email.policy import itertools +from typing import Dict, List from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI @@ -44,9 +45,9 @@ _parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) - def translate(self, content): + def translate(self, content: bytes) -> Dict: msg = self._parser.parsebytes(content) - d = {} + d: Dict[str, List[str]] = {} for (key, value) in msg.items(): key = _normalize_pkginfo_key(key) if value != "UNKNOWN": @@ -66,11 +67,11 @@ } return self.normalize_translation(metadata) - def normalize_home_page(self, urls): + def normalize_home_page(self, urls: List[str]) -> List[Dict[str, str]]: return [{"@id": url} for url in urls] - def normalize_keywords(self, keywords): + def normalize_keywords(self, keywords: List[str]) -> List[str]: return list(itertools.chain.from_iterable(s.split(" ") for s in keywords)) - def normalize_license(self, licenses): + def normalize_license(self, licenses: str) -> List[Dict[str, str]]: return [{"@id": license} for license in licenses] diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -6,13 +6,14 @@ import ast import itertools import re +from typing import Any, Dict, List, Optional, Union from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import DictMapping +from .base import DictMapping, FileEntry -def name_to_person(name): +def name_to_person(name: str) -> Dict[str, str]: return { "@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": name, @@ -28,29 +29,29 @@ _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod - def detect_metadata_files(cls, file_entries): + def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] - def translate(self, raw_content): + def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: try: - raw_content = raw_content.decode() + content: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) - return + return None # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( - lambda x: not self._re_spec_new.match(x), raw_content.split("\n") + lambda x: not self._re_spec_new.match(x), content.split("\n") ) try: next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) - return + return None content_dict = {} for line in lines: @@ -61,7 +62,7 @@ content_dict[match.group("key")] = value return self._translate_dict(content_dict) - def eval_ruby_expression(self, expr): + def eval_ruby_expression(self, expr: str) -> Optional[Union[str, List]]: """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') @@ -97,31 +98,36 @@ # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): - return + return None if isinstance(tree, ast.Expression): return evaluator(tree.body) + return None - def normalize_homepage(self, s): + def normalize_homepage(self, s) -> Optional[Dict[str, str]]: if isinstance(s, str): return {"@id": s} + return None - def normalize_license(self, s): + def normalize_license(self, s) -> Optional[List[Dict[str, str]]]: if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] + return None - def normalize_licenses(self, licenses): + def normalize_licenses(self, licenses) -> Optional[List[Dict[str, str]]]: if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} for license in licenses if isinstance(license, str) ] + return None - def normalize_author(self, author): + def normalize_author(self, author) -> Optional[Dict[str, Any]]: if isinstance(author, str): return {"@list": [name_to_person(author)]} + return None - def normalize_authors(self, authors): + def normalize_authors(self, authors) -> Optional[Dict[str, List[Dict[str, Any]]]]: if isinstance(authors, list): return { "@list": [ @@ -130,3 +136,4 @@ if isinstance(author, str) ] } + return None