diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py index 81df82a..d33bc98 100644 --- a/swh/indexer/metadata_dictionary/__init__.py +++ b/swh/indexer/metadata_dictionary/__init__.py @@ -1,40 +1,40 @@ import collections import click from . import cff, codemeta, maven, npm, python, ruby MAPPINGS = { "CodemetaMapping": codemeta.CodemetaMapping, "MavenMapping": maven.MavenMapping, "NpmMapping": npm.NpmMapping, "PythonPkginfoMapping": python.PythonPkginfoMapping, "GemspecMapping": ruby.GemspecMapping, "CffMapping": cff.CffMapping, } def list_terms(): """Returns a dictionary with all supported CodeMeta terms as keys, and the mappings that support each of them as values.""" d = collections.defaultdict(set) for mapping in MAPPINGS.values(): for term in mapping.supported_terms(): d[term].add(mapping) return d @click.command() @click.argument("mapping_name") @click.argument("file_name") -def main(mapping_name, file_name): +def main(mapping_name: str, file_name: str): from pprint import pprint with open(file_name, "rb") as fd: file_content = fd.read() res = MAPPINGS[mapping_name]().translate(file_content) pprint(res) if __name__ == "__main__": main() diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 0a2becf..774875c 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,177 +1,180 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging -from typing import List +from typing import Any, Dict, List, Optional from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values class BaseMapping: """Base class for mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod - def detect_metadata_files(cls, files): + def detect_metadata_files(cls, files: List[Dict[str, str]]) -> List[str]: """ Detects files potentially containing metadata Args: file_entries (list): list of files Returns: list: list of sha1 (possibly empty) """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") - def translate(self, file_content): + def translate(self, file_content: bytes) -> Optional[Dict]: raise NotImplementedError(f"{self.__class__.__name__}.translate") - def normalize_translation(self, metadata): + def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata) class SingleFileMapping(BaseMapping): """Base class for all mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod - def detect_metadata_files(cls, file_entries): + def detect_metadata_files(cls, file_entries: List[Dict[str, str]]) -> List[str]: for entry in file_entries: - if entry["name"].lower() == cls.filename.lower(): + if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod - def _normalize_method_name(name): + def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): return { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "translate_" + cls._normalize_method_name(key)) or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } - def _translate_dict(self, content_dict, *, normalize=True): + def _translate_dict( + self, content_dict: Dict, *, normalize: bool = True + ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping, SingleFileMapping): """Base class for all mappings that use a JSON file as input.""" - def translate(self, raw_content): + def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: - raw_content = raw_content.decode() + raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) - return + return None try: - content_dict = json.loads(raw_content) + content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) - return + return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) + return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py index 43f944d..934579e 100644 --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,65 +1,69 @@ +from typing import Dict, List, Optional, Union + import yaml from swh.indexer.codemeta import CODEMETA_CONTEXT_URL, CROSSWALK_TABLE, SCHEMA_URI from .base import DictMapping, SingleFileMapping yaml.SafeLoader.yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } class CffMapping(DictMapping, SingleFileMapping): """Dedicated class for Citation (CITATION.cff) mapping and translation""" name = "cff" filename = b"CITATION.cff" mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] - def translate(self, raw_content): - raw_content = raw_content.decode() - content_dict = yaml.load(raw_content, Loader=yaml.SafeLoader) + def translate(self, raw_content: bytes) -> Dict[str, str]: + raw_content_string: str = raw_content.decode() + content_dict = yaml.load(raw_content_string, Loader=yaml.SafeLoader) metadata = self._translate_dict(content_dict) metadata["@context"] = CODEMETA_CONTEXT_URL return metadata - def normalize_authors(self, d): + def normalize_authors(self, d: List[dict]) -> Dict[str, list]: result = [] for author in d: - author_data = {"@type": SCHEMA_URI + "Person"} + author_data: Dict[str, Optional[Union[str, Dict]]] = { + "@type": SCHEMA_URI + "Person" + } if "orcid" in author: author_data["@id"] = author["orcid"] if "affiliation" in author: author_data[SCHEMA_URI + "affiliation"] = { "@type": SCHEMA_URI + "Organization", SCHEMA_URI + "name": author["affiliation"], } if "family-names" in author: author_data[SCHEMA_URI + "familyName"] = author["family-names"] if "given-names" in author: author_data[SCHEMA_URI + "givenName"] = author["given-names"] result.append(author_data) - result = {"@list": result} - return result + result_final = {"@list": result} + return result_final - def normalize_doi(self, s): + def normalize_doi(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://doi.org/" + s} - def normalize_license(self, s): + def normalize_license(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} - def normalize_repository_code(self, s): + def normalize_repository_code(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": s} - def normalize_date_released(self, s): + def normalize_date_released(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@value": s, "@type": SCHEMA_URI + "Date"} diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py index a93d6d5..0bbb3fa 100644 --- a/swh/indexer/metadata_dictionary/codemeta.py +++ b/swh/indexer/metadata_dictionary/codemeta.py @@ -1,30 +1,31 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json +from typing import Any, Dict, List, Optional from swh.indexer.codemeta import CODEMETA_TERMS, expand from .base import SingleFileMapping class CodemetaMapping(SingleFileMapping): """ dedicated class for CodeMeta (codemeta.json) mapping and translation """ name = "codemeta" filename = b"codemeta.json" string_fields = None @classmethod - def supported_terms(cls): + def supported_terms(cls) -> List[str]: return [term for term in CODEMETA_TERMS if not term.startswith("@")] - def translate(self, content): + def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: return self.normalize_translation(expand(json.loads(content.decode()))) except Exception: return None diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py index f5f6d9b..ad4c5ed 100644 --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -1,161 +1,162 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os +from typing import Any, Dict, Optional import xml.parsers.expat import xmltodict from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI from .base import DictMapping, SingleFileMapping class MavenMapping(DictMapping, SingleFileMapping): """ dedicated class for Maven (pom.xml) mapping and translation """ name = "maven" filename = b"pom.xml" mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] - def translate(self, content): + def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: d = xmltodict.parse(content).get("project") or {} except xml.parsers.expat.ExpatError: self.log.warning("Error parsing XML from %s", self.log_suffix) return None except UnicodeDecodeError: self.log.warning("Error unidecoding XML from %s", self.log_suffix) return None except (LookupError, ValueError): # unknown encoding or multi-byte encoding self.log.warning("Error detecting XML encoding from %s", self.log_suffix) return None if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", content) return None metadata = self._translate_dict(d, normalize=False) metadata[SCHEMA_URI + "codeRepository"] = self.parse_repositories(d) metadata[SCHEMA_URI + "license"] = self.parse_licenses(d) return self.normalize_translation(metadata) _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} def parse_repositories(self, d): """https://maven.apache.org/pom.html#Repositories >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... codehausSnapshots ... Codehaus Snapshots ... http://snapshots.maven.codehaus.org/maven2 ... default ... ... ... ''') >>> MavenMapping().parse_repositories(d) """ repositories = d.get("repositories") if not repositories: results = [self.parse_repository(d, self._default_repository)] elif isinstance(repositories, dict): repositories = repositories.get("repository") or [] if not isinstance(repositories, list): repositories = [repositories] results = [self.parse_repository(d, repo) for repo in repositories] else: results = [] return [res for res in results if res] or None def parse_repository(self, d, repo): if not isinstance(repo, dict): return if repo.get("layout", "default") != "default": return # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") if ( isinstance(url, str) and isinstance(group_id, str) and isinstance(artifact_id, str) ): repo = os.path.join(url, *group_id.split("."), artifact_id) return {"@id": repo} def normalize_groupId(self, id_): """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') {'@id': 'org.example'} """ if isinstance(id_, str): return {"@id": id_} def parse_licenses(self, d): """https://maven.apache.org/pom.html#Licenses >>> import xmltodict >>> import json >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... ''') >>> print(json.dumps(d, indent=4)) { "licenses": { "license": { "name": "Apache License, Version 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.txt" } } } >>> MavenMapping().parse_licenses(d) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}] or, if there are more than one license: >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... MIT License ... https://opensource.org/licenses/MIT ... ... ... ''') >>> pprint(MavenMapping().parse_licenses(d)) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}, {'@id': 'https://opensource.org/licenses/MIT'}] """ licenses = d.get("licenses") if not isinstance(licenses, dict): return licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): return return [ {"@id": license["url"]} for license in licenses if isinstance(license, dict) and isinstance(license.get("url"), str) ] or None