diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 415f7fc..9f62018 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,351 +1,360 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar import uuid import xml.parsers.expat from pyld import jsonld import rdflib from typing_extensions import TypedDict import xmltodict import yaml from swh.indexer.codemeta import _document_loader, compact from swh.indexer.namespaces import RDF, SCHEMA from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None], ) def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(uris) # type: ignore return f return decorator class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") def translate(self, file_content: bytes) -> Optional[Dict]: """Translates metadata, from the content of a file or of a RawExtrinsicMetadata object.""" raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") class BaseExtrinsicMapping(BaseMapping): """Base class for extrinsic_metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True) class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False) class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" - string_fields = [] # type: List[str] + string_fields: List[str] = [] """List of fields that are simple strings, and don't need any normalization.""" + uri_fields: List[str] = [] + """List of fields that are simple URIs, and don't need any + normalization.""" + @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { str(term) for (key, term) in cls.mapping.items() - if key in cls.string_fields + if key in cls.string_fields + cls.uri_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { str(term) for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ graph = rdflib.Graph() # The main object being described (the SoftwareSourceCode) does not necessarily # may or may not have an id. # Either way, we temporarily use this URI to identify it. Unfortunately, # we cannot use a blank node as we need to use it for JSON-LD framing later, # and blank nodes cannot be used for framing in JSON-LD >= 1.1 root_id = ( "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/" + str(uuid.uuid4()) ) root = rdflib.URIRef(root_id) graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode)) for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(graph, root, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value, # and add its results to the triples normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) if v is None: pass elif isinstance(v, list): for item in reversed(v): graph.add((root, codemeta_key, item)) else: graph.add((root, codemeta_key, v)) elif k in self.string_fields and isinstance(v, str): graph.add((root, codemeta_key, rdflib.Literal(v))) elif k in self.string_fields and isinstance(v, list): for item in v: graph.add((root, codemeta_key, rdflib.Literal(item))) + elif k in self.uri_fields and isinstance(v, str): + graph.add((root, codemeta_key, rdflib.URIRef(v))) + elif k in self.uri_fields and isinstance(v, list): + for item in v: + graph.add((root, codemeta_key, rdflib.URIRef(item))) else: continue self.extra_translation(graph, root, content_dict) # Convert from rdflib's internal graph representation to JSON s = graph.serialize(format="application/ld+json") # Load from JSON to a list of Python objects jsonld_graph = json.loads(s) # Use JSON-LD framing to turn the graph into a rooted tree # frame = {"@type": str(SCHEMA.SoftwareSourceCode)} translated_metadata = jsonld.frame( jsonld_graph, {"@id": root_id}, options={ "documentLoader": _document_loader, "processingMode": "json-ld-1.1", }, ) # Remove the temporary id we added at the beginning if isinstance(translated_metadata["@id"], list): translated_metadata["@id"].remove(root_id) else: del translated_metadata["@id"] return self.normalize_translation(translated_metadata) def extra_translation( self, graph: rdflib.Graph, root: rdflib.term.Node, d: Dict[str, Any] ): """Called at the end of the translation process, and may add arbitrary triples to ``graph`` based on the input dictionary (passed as ``d``). """ pass class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None class XmlMapping(DictMapping): """Base class for all mappings that use XML data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing XML data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: d = xmltodict.parse(raw_content) except xml.parsers.expat.ExpatError: self.log.warning("Error parsing XML from %s", self.log_suffix) return None except UnicodeDecodeError: self.log.warning("Error unidecoding XML from %s", self.log_suffix) return None except (LookupError, ValueError): # unknown encoding or multi-byte encoding self.log.warning("Error detecting XML encoding from %s", self.log_suffix) return None if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", raw_content) return None return self._translate_dict(d) class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } class YamlMapping(DictMapping, SingleFileIntrinsicMapping): """Base class for all mappings that use Yaml data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.scanner.ScannerError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py index 4f4e99e..12121cc 100644 --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,66 +1,63 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import List from rdflib import BNode, Graph, Literal, URIRef import rdflib.term from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import RDF, SCHEMA from .base import YamlMapping from .utils import add_map DOI = URIRef("https://doi.org/") SPDX = URIRef("https://spdx.org/licenses/") class CffMapping(YamlMapping): """Dedicated class for Citation (CITATION.cff) mapping and translation""" name = "cff" filename = b"CITATION.cff" mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] + uri_fields = ["repository-code"] def _translate_author(self, graph: Graph, author: dict) -> rdflib.term.Node: node: rdflib.term.Node if "orcid" in author and isinstance(author["orcid"], str): node = URIRef(author["orcid"]) else: node = BNode() graph.add((node, RDF.type, SCHEMA.Person)) if "affiliation" in author and isinstance(author["affiliation"], str): affiliation = BNode() graph.add((node, SCHEMA.affiliation, affiliation)) graph.add((affiliation, RDF.type, SCHEMA.Organization)) graph.add((affiliation, SCHEMA.name, Literal(author["affiliation"]))) if "family-names" in author and isinstance(author["family-names"], str): graph.add((node, SCHEMA.familyName, Literal(author["family-names"]))) if "given-names" in author and isinstance(author["given-names"], str): graph.add((node, SCHEMA.givenName, Literal(author["given-names"]))) return node def translate_authors( self, graph: Graph, root: URIRef, authors: List[dict] ) -> None: add_map(graph, root, SCHEMA.author, self._translate_author, authors) def normalize_doi(self, s: str) -> URIRef: if isinstance(s, str): return DOI + s def normalize_license(self, s: str) -> URIRef: if isinstance(s, str): return SPDX + s - def normalize_repository_code(self, s: str) -> URIRef: - if isinstance(s, str): - return URIRef(s) - def normalize_date_released(self, s: str) -> Literal: if isinstance(s, str): return Literal(s, datatype=SCHEMA.Date) diff --git a/swh/indexer/metadata_dictionary/composer.py b/swh/indexer/metadata_dictionary/composer.py index 294b38d..a43fc23 100644 --- a/swh/indexer/metadata_dictionary/composer.py +++ b/swh/indexer/metadata_dictionary/composer.py @@ -1,65 +1,61 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path from typing import Optional from rdflib import BNode, Graph, Literal, URIRef from swh.indexer.codemeta import _DATA_DIR, _read_crosstable from swh.indexer.namespaces import RDF, SCHEMA from .base import JsonMapping, SingleFileIntrinsicMapping from .utils import add_map SPDX = URIRef("https://spdx.org/licenses/") COMPOSER_TABLE_PATH = os.path.join(_DATA_DIR, "composer.csv") with open(COMPOSER_TABLE_PATH) as fd: (CODEMETA_TERMS, COMPOSER_TABLE) = _read_crosstable(fd) class ComposerMapping(JsonMapping, SingleFileIntrinsicMapping): """Dedicated class for Packagist(composer.json) mapping and translation""" name = "composer" mapping = COMPOSER_TABLE["Composer"] filename = b"composer.json" string_fields = [ "name", "description", "version", "keywords", - "homepage", "license", "author", "authors", ] - - def normalize_homepage(self, s): - if isinstance(s, str): - return URIRef(s) + uri_fields = ["homepage"] def normalize_license(self, s): if isinstance(s, str): return SPDX + s def _translate_author(self, graph: Graph, author) -> Optional[BNode]: if not isinstance(author, dict): return None node = BNode() graph.add((node, RDF.type, SCHEMA.Person)) if isinstance(author.get("name"), str): graph.add((node, SCHEMA.name, Literal(author["name"]))) if isinstance(author.get("email"), str): graph.add((node, SCHEMA.email, Literal(author["email"]))) return node def translate_authors(self, graph: Graph, root: URIRef, authors) -> None: add_map(graph, root, SCHEMA.author, self._translate_author, authors) diff --git a/swh/indexer/metadata_dictionary/dart.py b/swh/indexer/metadata_dictionary/dart.py index f3b9e87..ec6dfb2 100644 --- a/swh/indexer/metadata_dictionary/dart.py +++ b/swh/indexer/metadata_dictionary/dart.py @@ -1,79 +1,75 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import re from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import _DATA_DIR, _read_crosstable from swh.indexer.namespaces import SCHEMA from .base import YamlMapping from .utils import add_map SPDX = URIRef("https://spdx.org/licenses/") PUB_TABLE_PATH = os.path.join(_DATA_DIR, "pubspec.csv") with open(PUB_TABLE_PATH) as fd: (CODEMETA_TERMS, PUB_TABLE) = _read_crosstable(fd) def name_to_person(name): return { "@type": SCHEMA.Person, SCHEMA.name: name, } class PubspecMapping(YamlMapping): name = "pubspec" filename = b"pubspec.yaml" mapping = PUB_TABLE["Pubspec"] string_fields = [ "repository", "keywords", "description", "name", - "homepage", "issue_tracker", "platforms", "license" # license will only be used with the SPDX Identifier ] + uri_fields = ["homepage"] def normalize_license(self, s): if isinstance(s, str): return SPDX + s - def normalize_homepage(self, s): - if isinstance(s, str): - return URIRef(s) - def _translate_author(self, graph, s): name_email_re = re.compile("(?P.*?)( <(?P.*)>)") if isinstance(s, str): author = BNode() graph.add((author, RDF.type, SCHEMA.Person)) match = name_email_re.search(s) if match: name = match.group("name") email = match.group("email") graph.add((author, SCHEMA.email, Literal(email))) else: name = s graph.add((author, SCHEMA.name, Literal(name))) return author def translate_author(self, graph: Graph, root, s) -> None: add_map(graph, root, SCHEMA.author, self._translate_author, [s]) def translate_authors(self, graph: Graph, root, authors) -> None: if isinstance(authors, list): add_map(graph, root, SCHEMA.author, self._translate_author, authors) diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py index 450ec72..1540ef6 100644 --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -1,290 +1,282 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import urllib.parse from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import SCHEMA from .base import JsonMapping, SingleFileIntrinsicMapping from .utils import add_list, prettyprint_graph # noqa SPDX = URIRef("https://spdx.org/licenses/") class NpmMapping(JsonMapping, SingleFileIntrinsicMapping): """ dedicated class for NPM (package.json) mapping and translation """ name = "npm" mapping = CROSSWALK_TABLE["NodeJS"] filename = b"package.json" - string_fields = ["name", "version", "homepage", "description", "email"] + string_fields = ["name", "version", "description", "email"] + uri_fields = ["homepage"] _schema_shortcuts = { "github": "git+https://github.com/%s.git", "gist": "git+https://gist.github.com/%s.git", "gitlab": "git+https://gitlab.com/%s.git", # Bitbucket supports both hg and git, and the shortcut does not # tell which one to use. # 'bitbucket': 'https://bitbucket.org/', } def normalize_repository(self, d): """https://docs.npmjs.com/files/package.json#repository >>> NpmMapping().normalize_repository({ ... 'type': 'git', ... 'url': 'https://example.org/foo.git' ... }) rdflib.term.URIRef('git+https://example.org/foo.git') >>> NpmMapping().normalize_repository( ... 'gitlab:foo/bar') rdflib.term.URIRef('git+https://gitlab.com/foo/bar.git') >>> NpmMapping().normalize_repository( ... 'foo/bar') rdflib.term.URIRef('git+https://github.com/foo/bar.git') """ if ( isinstance(d, dict) and isinstance(d.get("type"), str) and isinstance(d.get("url"), str) ): url = "{type}+{url}".format(**d) elif isinstance(d, str): if "://" in d: url = d elif ":" in d: (schema, rest) = d.split(":", 1) if schema in self._schema_shortcuts: url = self._schema_shortcuts[schema] % rest else: return None else: url = self._schema_shortcuts["github"] % d else: return None return URIRef(url) def normalize_bugs(self, d): """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ ... 'url': 'https://example.org/bugs/', ... 'email': 'bugs@example.org' ... }) rdflib.term.URIRef('https://example.org/bugs/') >>> NpmMapping().normalize_bugs( ... 'https://example.org/bugs/') rdflib.term.URIRef('https://example.org/bugs/') """ if isinstance(d, dict) and isinstance(d.get("url"), str): return URIRef(d["url"]) elif isinstance(d, str): return URIRef(d) else: return None _parse_author = re.compile( r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$" ) def translate_author(self, graph: Graph, root, d): r"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint >>> root = URIRef("http://example.org/test-software") >>> graph = Graph() >>> NpmMapping().translate_author(graph, root, { ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https://example.org/~john.doe', ... }) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/author": { "@list": [ { "@type": "http://schema.org/Person", "http://schema.org/email": "john.doe@example.org", "http://schema.org/name": "John Doe", "http://schema.org/url": { "@id": "https://example.org/~john.doe" } } ] } } >>> graph = Graph() >>> NpmMapping().translate_author(graph, root, ... 'John Doe (https://example.org/~john.doe)' ... ) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/author": { "@list": [ { "@type": "http://schema.org/Person", "http://schema.org/email": "john.doe@example.org", "http://schema.org/name": "John Doe", "http://schema.org/url": { "@id": "https://example.org/~john.doe" } } ] } } >>> graph = Graph() >>> NpmMapping().translate_author(graph, root, { ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https:\\\\example.invalid/~john.doe', ... }) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/author": { "@list": [ { "@type": "http://schema.org/Person", "http://schema.org/email": "john.doe@example.org", "http://schema.org/name": "John Doe" } ] } } """ # noqa author = BNode() graph.add((author, RDF.type, SCHEMA.Person)) if isinstance(d, dict): name = d.get("name", None) email = d.get("email", None) url = d.get("url", None) elif isinstance(d, str): match = self._parse_author.match(d) if not match: return None name = match.group("name") email = match.group("email") url = match.group("url") else: return None if name and isinstance(name, str): graph.add((author, SCHEMA.name, Literal(name))) if email and isinstance(email, str): graph.add((author, SCHEMA.email, Literal(email))) if url and isinstance(url, str): # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop # URLs that are blatantly invalid early, so PyLD does not crash. parsed_url = urllib.parse.urlparse(url) if parsed_url.netloc: graph.add((author, SCHEMA.url, URIRef(url))) add_list(graph, root, SCHEMA.author, [author]) def normalize_description(self, description): r"""Try to re-decode ``description`` as UTF-16, as this is a somewhat common mistake that causes issues in the database because of null bytes in JSON. >>> NpmMapping().normalize_description("foo bar") rdflib.term.Literal('foo bar') >>> NpmMapping().normalize_description( ... "\ufffd\ufffd#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 \x00" ... ) rdflib.term.Literal('foo bar') >>> NpmMapping().normalize_description( ... "\ufffd\ufffd\x00#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 " ... ) rdflib.term.Literal('foo bar') >>> NpmMapping().normalize_description( ... # invalid UTF-16 and meaningless UTF-8: ... "\ufffd\ufffd\x00#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description( ... # ditto (ut looks like little-endian at first) ... "\ufffd\ufffd#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description(None) is None True """ if not isinstance(description, str): return None # XXX: if this function ever need to support more cases, consider # switching to https://pypi.org/project/ftfy/ instead of adding more hacks if description.startswith("\ufffd\ufffd") and "\x00" in description: # 2 unicode replacement characters followed by '# ' encoded as UTF-16 # is a common mistake, which indicates a README.md was saved as UTF-16, # and some NPM tool opened it as UTF-8 and used the first line as # description. description_bytes = description.encode() # Strip the the two unicode replacement characters assert description_bytes.startswith(b"\xef\xbf\xbd\xef\xbf\xbd") description_bytes = description_bytes[6:] # If the following attempts fail to recover the description, discard it # entirely because the current indexer storage backend (postgresql) cannot # store zero bytes in JSON columns. description = None if not description_bytes.startswith(b"\x00"): # try UTF-16 little-endian (the most common) first try: description = description_bytes.decode("utf-16le") except UnicodeDecodeError: pass if description is None: # if it fails, try UTF-16 big-endian try: description = description_bytes.decode("utf-16be") except UnicodeDecodeError: pass if description: if description.startswith("# "): description = description[2:] return Literal(description.rstrip()) else: return None return Literal(description) def normalize_license(self, s): """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') rdflib.term.URIRef('https://spdx.org/licenses/MIT') """ if isinstance(s, str): return SPDX + s - def normalize_homepage(self, s): - """https://docs.npmjs.com/files/package.json#homepage - - >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') - rdflib.term.URIRef('https://example.org/~john.doe') - """ - if isinstance(s, str): - return URIRef(s) - def normalize_keywords(self, lst): """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) [rdflib.term.Literal('foo'), rdflib.term.Literal('bar')] """ if isinstance(lst, list): return [Literal(x) for x in lst if isinstance(x, str)] diff --git a/swh/indexer/metadata_dictionary/nuget.py b/swh/indexer/metadata_dictionary/nuget.py index 58fe765..62f7ea9 100644 --- a/swh/indexer/metadata_dictionary/nuget.py +++ b/swh/indexer/metadata_dictionary/nuget.py @@ -1,104 +1,95 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import re from typing import Any, Dict, List from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import _DATA_DIR, _read_crosstable from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 from .base import BaseIntrinsicMapping, DirectoryLsEntry, XmlMapping from .utils import add_list NUGET_TABLE_PATH = os.path.join(_DATA_DIR, "nuget.csv") with open(NUGET_TABLE_PATH) as fd: (CODEMETA_TERMS, NUGET_TABLE) = _read_crosstable(fd) SPDX = URIRef("https://spdx.org/licenses/") class NuGetMapping(XmlMapping, BaseIntrinsicMapping): """ dedicated class for NuGet (.nuspec) mapping and translation """ name = "nuget" mapping = NUGET_TABLE["NuGet"] mapping["copyright"] = URIRef("http://schema.org/copyrightNotice") mapping["language"] = URIRef("http://schema.org/inLanguage") string_fields = [ "description", "version", - "projectUrl", "name", "tags", "license", - "licenseUrl", "summary", "copyright", "language", ] + uri_fields = ["projectUrl", "licenseUrl"] @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".nuspec"): return [entry["sha1"]] return [] def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: return super()._translate_dict(d.get("package", {}).get("metadata", {})) - def normalize_projectUrl(self, s): - if isinstance(s, str): - return URIRef(s) - def translate_repository(self, graph, root, v): if isinstance(v, dict) and isinstance(v["@url"], str): codemeta_key = URIRef(self.mapping["repository.url"]) graph.add((root, codemeta_key, URIRef(v["@url"]))) def normalize_license(self, v): if isinstance(v, dict) and v["@type"] == "expression": license_string = v["#text"] if not bool( re.search(r" with |\(|\)| and ", license_string, re.IGNORECASE) ): return [ SPDX + license_type.strip() for license_type in re.split( r" or ", license_string, flags=re.IGNORECASE ) ] else: return None - def normalize_licenseUrl(self, s): - if isinstance(s, str): - return URIRef(s) - def translate_authors(self, graph: Graph, root, s): if isinstance(s, str): authors = [] for author_name in s.split(","): author_name = author_name.strip() author = BNode() graph.add((author, RDF.type, SCHEMA.Person)) graph.add((author, SCHEMA.name, Literal(author_name))) authors.append(author) add_list(graph, root, SCHEMA.author, authors) def translate_releaseNotes(self, graph: Graph, root, s): if isinstance(s, str): graph.add((root, SCHEMA.releaseNotes, Literal(s))) def normalize_tags(self, s): if isinstance(s, str): return [Literal(tag) for tag in s.split(" ")] diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py index a0f20af..71a0b10 100644 --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -1,133 +1,130 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import itertools import re from typing import List from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 from .base import BaseIntrinsicMapping, DictMapping from .utils import add_map SPDX = URIRef("https://spdx.org/licenses/") def name_to_person(graph: Graph, name): if not isinstance(name, str): return None author = BNode() graph.add((author, RDF.type, SCHEMA.Person)) graph.add((author, SCHEMA.name, Literal(name))) return author class GemspecMapping(BaseIntrinsicMapping, DictMapping): name = "gemspec" mapping = CROSSWALK_TABLE["Ruby Gem"] string_fields = ["name", "version", "description", "summary", "email"] + uri_fields = ["homepage"] _re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] def translate(self, raw_content): try: raw_content = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( lambda x: not self._re_spec_new.match(x), raw_content.split("\n") ) try: next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) return content_dict = {} for line in lines: match = self._re_spec_entry.match(line) if match: value = self.eval_ruby_expression(match.group("expr")) if value: content_dict[match.group("key")] = value return self._translate_dict(content_dict) def eval_ruby_expression(self, expr): """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') 'Foo bar' >>> GemspecMapping().eval_ruby_expression("'Foo bar'") 'Foo bar' >>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") ['Foo', 'bar'] >>> GemspecMapping().eval_ruby_expression("'Foo bar'.freeze") 'Foo bar' >>> GemspecMapping().eval_ruby_expression( \ "['Foo'.freeze, 'bar'.freeze]") ['Foo', 'bar'] """ def evaluator(node): if isinstance(node, ast.Str): return node.s elif isinstance(node, ast.List): res = [] for element in node.elts: val = evaluator(element) if not val: return res.append(val) return res expr = expr.replace(".freeze", "") try: # We're parsing Ruby expressions here, but Python's # ast.parse works for very simple Ruby expressions # (mainly strings delimited with " or ', and lists # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): return if isinstance(tree, ast.Expression): return evaluator(tree.body) - def normalize_homepage(self, s): - if isinstance(s, str): - return URIRef(s) - def normalize_license(self, s): if isinstance(s, str): return SPDX + s def normalize_licenses(self, licenses): if isinstance(licenses, list): return [SPDX + license for license in licenses if isinstance(license, str)] def translate_author(self, graph: Graph, root, author): if isinstance(author, str): add_map(graph, root, SCHEMA.author, name_to_person, [author]) def translate_authors(self, graph: Graph, root, authors): if isinstance(authors, list): add_map(graph, root, SCHEMA.author, name_to_person, authors)