diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index be2eae1..5208745 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,272 +1,308 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar +import xml.parsers.expat from typing_extensions import TypedDict +import xmltodict import yaml from swh.indexer.codemeta import compact, merge_values from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] ) def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(uris) # type: ignore return f return decorator class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") def translate(self, file_content: bytes) -> Optional[Dict]: """Translates metadata, from the content of a file or of a RawExtrinsicMetadata object.""" raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") class BaseExtrinsicMapping(BaseMapping): """Base class for extrinsic_metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True) class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False) class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { term for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms - def _translate_dict(self, content_dict: Dict) -> Dict[str, str]: + def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA.SoftwareSourceCode} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v self.extra_translation(translated_metadata, content_dict) return self.normalize_translation(translated_metadata) def extra_translation(self, translated_metadata: Dict[str, Any], d: Dict[str, Any]): """Called at the end of the translation process, and may add arbitrary keys to ``translated_metadata`` based on the input dictionary (passed as ``d``). """ pass class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None +class XmlMapping(DictMapping): + """Base class for all mappings that use XML data as input.""" + + def translate(self, raw_content: bytes) -> Optional[Dict]: + """ + Translates content by parsing content from a bytestring containing + XML data and translating with the appropriate mapping + + Args: + raw_content (bytes): raw content to translate + + Returns: + dict: translated metadata in json-friendly form needed for + the indexer + + """ + try: + d = xmltodict.parse(raw_content) + except xml.parsers.expat.ExpatError: + self.log.warning("Error parsing XML from %s", self.log_suffix) + return None + except UnicodeDecodeError: + self.log.warning("Error unidecoding XML from %s", self.log_suffix) + return None + except (LookupError, ValueError): + # unknown encoding or multi-byte encoding + self.log.warning("Error detecting XML encoding from %s", self.log_suffix) + return None + if not isinstance(d, dict): + self.log.warning("Skipping ill-formed XML content: %s", raw_content) + return None + return self._translate_dict(d) + + class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } class YamlMapping(DictMapping, SingleFileIntrinsicMapping): """Base class for all mappings that use Yaml data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.scanner.ScannerError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py index 40c9de4..179538b 100644 --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -1,169 +1,151 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os -from typing import Any, Dict, Optional -import xml.parsers.expat - -import xmltodict +from typing import Any, Dict from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import SCHEMA -from .base import DictMapping, SingleFileIntrinsicMapping +from .base import SingleFileIntrinsicMapping, XmlMapping -class MavenMapping(DictMapping, SingleFileIntrinsicMapping): +class MavenMapping(XmlMapping, SingleFileIntrinsicMapping): """ dedicated class for Maven (pom.xml) mapping and translation """ name = "maven" filename = b"pom.xml" mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] - def translate(self, content: bytes) -> Optional[Dict[str, Any]]: - try: - d = xmltodict.parse(content).get("project") or {} - except xml.parsers.expat.ExpatError: - self.log.warning("Error parsing XML from %s", self.log_suffix) - return None - except UnicodeDecodeError: - self.log.warning("Error unidecoding XML from %s", self.log_suffix) - return None - except (LookupError, ValueError): - # unknown encoding or multi-byte encoding - self.log.warning("Error detecting XML encoding from %s", self.log_suffix) - return None - if not isinstance(d, dict): - self.log.warning("Skipping ill-formed XML content: %s", content) - return None - return self._translate_dict(d) - _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} + def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: + return super()._translate_dict(d.get("project") or {}) + def extra_translation(self, translated_metadata, d): repositories = self.parse_repositories(d) if repositories: translated_metadata[SCHEMA.codeRepository] = repositories def parse_repositories(self, d): """https://maven.apache.org/pom.html#Repositories >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... codehausSnapshots ... Codehaus Snapshots ... http://snapshots.maven.codehaus.org/maven2 ... default ... ... ... ''') >>> MavenMapping().parse_repositories(d) """ repositories = d.get("repositories") if not repositories: results = [self.parse_repository(d, self._default_repository)] elif isinstance(repositories, dict): repositories = repositories.get("repository") or [] if not isinstance(repositories, list): repositories = [repositories] results = [self.parse_repository(d, repo) for repo in repositories] else: results = [] return [res for res in results if res] or None def parse_repository(self, d, repo): if not isinstance(repo, dict): return if repo.get("layout", "default") != "default": return # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") if ( isinstance(url, str) and isinstance(group_id, str) and isinstance(artifact_id, str) ): repo = os.path.join(url, *group_id.split("."), artifact_id) return {"@id": repo} def normalize_groupId(self, id_): """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') {'@id': 'org.example'} """ if isinstance(id_, str): return {"@id": id_} def translate_licenses(self, translated_metadata, d): licenses = self.parse_licenses(d) if licenses: translated_metadata[SCHEMA.license] = licenses def parse_licenses(self, licenses): """https://maven.apache.org/pom.html#Licenses >>> import xmltodict >>> import json >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... ''') >>> print(json.dumps(d, indent=4)) { "licenses": { "license": { "name": "Apache License, Version 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.txt" } } } >>> MavenMapping().parse_licenses(d["licenses"]) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}] or, if there are more than one license: >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... MIT License ... https://opensource.org/licenses/MIT ... ... ... ''') >>> pprint(MavenMapping().parse_licenses(d["licenses"])) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}, {'@id': 'https://opensource.org/licenses/MIT'}] """ if not isinstance(licenses, dict): return licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): return return [ {"@id": license["url"]} for license in licenses if isinstance(license, dict) and isinstance(license.get("url"), str) ] or None diff --git a/swh/indexer/metadata_dictionary/nuget.py b/swh/indexer/metadata_dictionary/nuget.py index 05b95d4..470a972 100644 --- a/swh/indexer/metadata_dictionary/nuget.py +++ b/swh/indexer/metadata_dictionary/nuget.py @@ -1,105 +1,98 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import re -from typing import Any, Dict, List, Optional - -import xmltodict +from typing import Any, Dict, List from swh.indexer.codemeta import _DATA_DIR, _read_crosstable from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 -from .base import BaseIntrinsicMapping, DictMapping, DirectoryLsEntry +from .base import BaseIntrinsicMapping, DirectoryLsEntry, XmlMapping NUGET_TABLE_PATH = os.path.join(_DATA_DIR, "nuget.csv") with open(NUGET_TABLE_PATH) as fd: (CODEMETA_TERMS, NUGET_TABLE) = _read_crosstable(fd) -class NuGetMapping(DictMapping, BaseIntrinsicMapping): +class NuGetMapping(XmlMapping, BaseIntrinsicMapping): """ dedicated class for NuGet (.nuspec) mapping and translation """ name = "nuget" mapping = NUGET_TABLE["NuGet"] mapping["copyright"] = "http://schema.org/copyrightNotice" mapping["language"] = "http://schema.org/inLanguage" string_fields = [ "description", "version", "projectUrl", "name", "tags", "license", "licenseUrl", "summary", "copyright", "language", ] @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".nuspec"): return [entry["sha1"]] return [] - def translate(self, content: bytes) -> Optional[Dict[str, Any]]: - d = xmltodict.parse(content).get("package", {}).get("metadata", {}) - if not isinstance(d, dict): - self.log.warning("Skipping ill-formed XML content: %s", content) - return None - - return self._translate_dict(d) + def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: + return super()._translate_dict(d.get("package", {}).get("metadata", {})) def normalize_projectUrl(self, s): if isinstance(s, str): return {"@id": s} def translate_repository(self, translated_metadata, v): if isinstance(v, dict) and isinstance(v["@url"], str): codemeta_key = self.mapping["repository.url"] translated_metadata[codemeta_key] = {"@id": v["@url"]} def normalize_license(self, v): if isinstance(v, dict) and v["@type"] == "expression": license_string = v["#text"] if not bool( re.search(r" with |\(|\)| and ", license_string, re.IGNORECASE) ): return [ {"@id": "https://spdx.org/licenses/" + license_type.strip()} for license_type in re.split( r" or ", license_string, flags=re.IGNORECASE ) ] else: return None def normalize_licenseUrl(self, s): if isinstance(s, str): return {"@id": s} def normalize_authors(self, s): if isinstance(s, str): author_names = [a.strip() for a in s.split(",")] authors = [ {"@type": SCHEMA.Person, SCHEMA.name: name} for name in author_names ] return {"@list": authors} def translate_releaseNotes(self, translated_metadata, s): if isinstance(s, str): translated_metadata.setdefault("http://schema.org/releaseNotes", []).append( s ) def normalize_tags(self, s): if isinstance(s, str): return s.split(" ")