diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index b4e781a..7d63e0e 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,405 +1,404 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar -import urllib.parse import uuid import xml.parsers.expat from pyld import jsonld import rdflib from typing_extensions import TypedDict import xmltodict import yaml from swh.indexer.codemeta import _document_loader, compact from swh.indexer.namespaces import RDF, SCHEMA from swh.indexer.storage.interface import Sha1 +from .utils import add_url_if_valid + TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/" """Prefix used to generate temporary URIs for root nodes being translated.""" class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None], ) def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(uris) # type: ignore return f return decorator class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing mapping-specific data and translating with the appropriate mapping to JSON-LD using the Codemeta and ForgeFed vocabularies. Args: raw_content: raw content to translate Returns: translated metadata in JSON friendly form needed for the content if parseable, :const:`None` otherwise. """ raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") class BaseExtrinsicMapping(BaseMapping): """Base class for extrinsic_metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True) class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False) class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields: List[str] = [] """List of fields that are simple strings, and don't need any normalization.""" date_fields: List[str] = [] """List of fields that are strings that should be typed as http://schema.org/Date """ uri_fields: List[str] = [] """List of fields that are simple URIs, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { str(term) for (key, term) in cls.mapping.items() if key in cls.string_fields + cls.date_fields + cls.uri_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { str(term) for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def get_root_uri(self, content_dict: Dict) -> rdflib.URIRef: """Returns an URI for the SoftwareSourceCode or Repository being described. The default implementation uses a temporary URI that is stripped before normalization by :meth:`_translate_dict`. """ # The main object being described (the SoftwareSourceCode) does not necessarily # may or may not have an id. # If it does, it will need to be set by a subclass. # If it doesn't we temporarily use this URI to identify it. Unfortunately, # we cannot use a blank node as we need to use it for JSON-LD framing later, # and blank nodes cannot be used for framing in JSON-LD >= 1.1 root_id = TMP_ROOT_URI_PREFIX + str(uuid.uuid4()) return rdflib.URIRef(root_id) def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ graph = rdflib.Graph() root = self.get_root_uri(content_dict) self._translate_to_graph(graph, root, content_dict) self.sanitize(graph) # Convert from rdflib's internal graph representation to JSON s = graph.serialize(format="application/ld+json") # Load from JSON to a list of Python objects jsonld_graph = json.loads(s) # Use JSON-LD framing to turn the graph into a rooted tree # frame = {"@type": str(SCHEMA.SoftwareSourceCode)} translated_metadata = jsonld.frame( jsonld_graph, {"@id": str(root)}, options={ "documentLoader": _document_loader, "processingMode": "json-ld-1.1", }, ) # Remove the temporary id we added at the beginning assert isinstance(translated_metadata["@id"], str) if translated_metadata["@id"].startswith(TMP_ROOT_URI_PREFIX): del translated_metadata["@id"] return self.normalize_translation(translated_metadata) def _translate_to_graph( self, graph: rdflib.Graph, root: rdflib.term.Identifier, content_dict: Dict ) -> None: """ Translates content by parsing content from a dict object and translating with the appropriate mapping to the graph passed as parameter Args: content_dict (dict): content dict to translate """ graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode)) for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(graph, root, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value, # and add its results to the triples normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) if v is None: pass elif isinstance(v, list): for item in reversed(v): - graph.add((root, codemeta_key, item)) + if isinstance(item, rdflib.URIRef): + add_url_if_valid(graph, root, codemeta_key, str(item)) + else: + graph.add((root, codemeta_key, item)) else: - graph.add((root, codemeta_key, v)) + if isinstance(v, rdflib.URIRef): + add_url_if_valid(graph, root, codemeta_key, str(v)) + else: + graph.add((root, codemeta_key, v)) elif k in self.string_fields and isinstance(v, str): graph.add((root, codemeta_key, rdflib.Literal(v))) elif k in self.string_fields and isinstance(v, list): for item in v: graph.add((root, codemeta_key, rdflib.Literal(item))) elif k in self.date_fields and isinstance(v, str): typed_v = rdflib.Literal(v, datatype=SCHEMA.Date) graph.add((root, codemeta_key, typed_v)) elif k in self.date_fields and isinstance(v, list): for item in v: if isinstance(item, str): typed_item = rdflib.Literal(item, datatype=SCHEMA.Date) graph.add((root, codemeta_key, typed_item)) elif k in self.uri_fields and isinstance(v, str): - # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop - # URLs that are blatantly invalid early, so PyLD does not crash. - parsed_url = urllib.parse.urlparse(v) - if parsed_url.netloc: - graph.add((root, codemeta_key, rdflib.URIRef(v))) + add_url_if_valid(graph, root, codemeta_key, v) elif k in self.uri_fields and isinstance(v, list): for item in v: - if isinstance(item, str): - # ditto - parsed_url = urllib.parse.urlparse(item) - if parsed_url.netloc: - graph.add((root, codemeta_key, rdflib.URIRef(item))) + add_url_if_valid(graph, root, codemeta_key, item) else: continue self.extra_translation(graph, root, content_dict) def sanitize(self, graph: rdflib.Graph) -> None: # Remove triples that make PyLD crash for (subject, predicate, _) in graph.triples((None, None, rdflib.URIRef(""))): graph.remove((subject, predicate, rdflib.URIRef(""))) # Should not happen, but we's better check as this may lead to incorrect data invalid = False for triple in graph.triples((rdflib.URIRef(""), None, None)): invalid = True logging.error("Empty triple subject URI: %r", triple) if invalid: raise ValueError("Empty triple subject(s)") def extra_translation( self, graph: rdflib.Graph, root: rdflib.term.Node, d: Dict[str, Any] ) -> None: """Called at the end of the translation process, and may add arbitrary triples to ``graph`` based on the input dictionary (passed as ``d``). """ pass class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None class XmlMapping(DictMapping): """Base class for all mappings that use XML data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: try: d = xmltodict.parse(raw_content) except xml.parsers.expat.ExpatError: self.log.warning("Error parsing XML from %s", self.log_suffix) return None except UnicodeDecodeError: self.log.warning("Error unidecoding XML from %s", self.log_suffix) return None except (LookupError, ValueError): # unknown encoding or multi-byte encoding self.log.warning("Error detecting XML encoding from %s", self.log_suffix) return None if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", raw_content) return None return self._translate_dict(d) class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } class YamlMapping(DictMapping, SingleFileIntrinsicMapping): """Base class for all mappings that use Yaml data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.scanner.ScannerError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py index 8ced23e..78ba661 100644 --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,60 +1,65 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import List +import urllib.parse from rdflib import BNode, Graph, Literal, URIRef import rdflib.term from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import RDF, SCHEMA from .base import YamlMapping from .utils import add_map DOI = URIRef("https://doi.org/") SPDX = URIRef("https://spdx.org/licenses/") class CffMapping(YamlMapping): """Dedicated class for Citation (CITATION.cff) mapping and translation""" name = "cff" filename = b"CITATION.cff" mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] date_fields = ["date-released"] uri_fields = ["repository-code"] def _translate_author(self, graph: Graph, author: dict) -> rdflib.term.Node: node: rdflib.term.Node - if "orcid" in author and isinstance(author["orcid"], str): + if ( + "orcid" in author + and isinstance(author["orcid"], str) + and urllib.parse.urlparse(author["orcid"]).netloc + ): node = URIRef(author["orcid"]) else: node = BNode() graph.add((node, RDF.type, SCHEMA.Person)) if "affiliation" in author and isinstance(author["affiliation"], str): affiliation = BNode() graph.add((node, SCHEMA.affiliation, affiliation)) graph.add((affiliation, RDF.type, SCHEMA.Organization)) graph.add((affiliation, SCHEMA.name, Literal(author["affiliation"]))) if "family-names" in author and isinstance(author["family-names"], str): graph.add((node, SCHEMA.familyName, Literal(author["family-names"]))) if "given-names" in author and isinstance(author["given-names"], str): graph.add((node, SCHEMA.givenName, Literal(author["given-names"]))) return node def translate_authors( self, graph: Graph, root: URIRef, authors: List[dict] ) -> None: add_map(graph, root, SCHEMA.author, self._translate_author, authors) def normalize_doi(self, s: str) -> URIRef: if isinstance(s, str): return DOI + s def normalize_license(self, s: str) -> URIRef: if isinstance(s, str): return SPDX + s diff --git a/swh/indexer/metadata_dictionary/github.py b/swh/indexer/metadata_dictionary/github.py index 25f6eff..0435c41 100644 --- a/swh/indexer/metadata_dictionary/github.py +++ b/swh/indexer/metadata_dictionary/github.py @@ -1,137 +1,136 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Tuple from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import ACTIVITYSTREAMS, CODEMETA, FORGEFED, SCHEMA from .base import BaseExtrinsicMapping, JsonMapping, produce_terms -from .utils import prettyprint_graph # noqa +from .utils import add_url_if_valid, prettyprint_graph # noqa SPDX = URIRef("https://spdx.org/licenses/") class GitHubMapping(BaseExtrinsicMapping, JsonMapping): name = "github" mapping = { **CROSSWALK_TABLE["GitHub"], "topics": SCHEMA.keywords, # TODO: submit this to the official crosswalk "clone_url": SCHEMA.codeRepository, } uri_fields = [ "clone_url", ] date_fields = [ "created_at", "updated_at", ] string_fields = [ "description", "full_name", "topics", ] @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: return ("application/vnd.github.v3+json",) def extra_translation(self, graph, root, content_dict): graph.remove((root, RDF.type, SCHEMA.SoftwareSourceCode)) graph.add((root, RDF.type, FORGEFED.Repository)) if content_dict.get("has_issues"): - graph.add( - ( - root, - CODEMETA.issueTracker, - URIRef(content_dict["html_url"] + "/issues"), - ) + add_url_if_valid( + graph, + root, + CODEMETA.issueTracker, + URIRef(content_dict["html_url"] + "/issues"), ) def get_root_uri(self, content_dict: dict) -> URIRef: if isinstance(content_dict.get("html_url"), str): return URIRef(content_dict["html_url"]) else: raise ValueError( f"GitHub metadata has missing/invalid html_url: {content_dict}" ) @produce_terms(FORGEFED.forks, ACTIVITYSTREAMS.totalItems) def translate_forks_count(self, graph: Graph, root: BNode, v: Any) -> None: """ >>> graph = Graph() >>> root = URIRef("http://example.org/test-software") >>> GitHubMapping().translate_forks_count(graph, root, 42) >>> prettyprint_graph(graph, root) { "@id": ..., "https://forgefed.org/ns#forks": { "@type": "https://www.w3.org/ns/activitystreams#OrderedCollection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } } """ if isinstance(v, int): collection = BNode() graph.add((root, FORGEFED.forks, collection)) graph.add((collection, RDF.type, ACTIVITYSTREAMS.OrderedCollection)) graph.add((collection, ACTIVITYSTREAMS.totalItems, Literal(v))) @produce_terms(ACTIVITYSTREAMS.likes, ACTIVITYSTREAMS.totalItems) def translate_stargazers_count(self, graph: Graph, root: BNode, v: Any) -> None: """ >>> graph = Graph() >>> root = URIRef("http://example.org/test-software") >>> GitHubMapping().translate_stargazers_count(graph, root, 42) >>> prettyprint_graph(graph, root) { "@id": ..., "https://www.w3.org/ns/activitystreams#likes": { "@type": "https://www.w3.org/ns/activitystreams#Collection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } } """ if isinstance(v, int): collection = BNode() graph.add((root, ACTIVITYSTREAMS.likes, collection)) graph.add((collection, RDF.type, ACTIVITYSTREAMS.Collection)) graph.add((collection, ACTIVITYSTREAMS.totalItems, Literal(v))) @produce_terms(ACTIVITYSTREAMS.followers, ACTIVITYSTREAMS.totalItems) def translate_watchers_count(self, graph: Graph, root: BNode, v: Any) -> None: """ >>> graph = Graph() >>> root = URIRef("http://example.org/test-software") >>> GitHubMapping().translate_watchers_count(graph, root, 42) >>> prettyprint_graph(graph, root) { "@id": ..., "https://www.w3.org/ns/activitystreams#followers": { "@type": "https://www.w3.org/ns/activitystreams#Collection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } } """ if isinstance(v, int): collection = BNode() graph.add((root, ACTIVITYSTREAMS.followers, collection)) graph.add((collection, RDF.type, ACTIVITYSTREAMS.Collection)) graph.add((collection, ACTIVITYSTREAMS.totalItems, Literal(v))) def normalize_license(self, d): """ >>> GitHubMapping().normalize_license({'spdx_id': 'MIT'}) rdflib.term.URIRef('https://spdx.org/licenses/MIT') """ if isinstance(d, dict) and isinstance(d.get("spdx_id"), str): return SPDX + d["spdx_id"] diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py index 8b3e48d..5575ba9 100644 --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -1,162 +1,163 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict -from rdflib import Graph, Literal, URIRef +from rdflib import Graph, Literal from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import SCHEMA from .base import SingleFileIntrinsicMapping, XmlMapping -from .utils import prettyprint_graph # noqa +from .utils import add_url_if_valid, prettyprint_graph # noqa class MavenMapping(XmlMapping, SingleFileIntrinsicMapping): """ dedicated class for Maven (pom.xml) mapping and translation """ name = "maven" filename = b"pom.xml" mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: return super()._translate_dict(d.get("project") or {}) def extra_translation(self, graph: Graph, root, d): self.parse_repositories(graph, root, d) def parse_repositories(self, graph: Graph, root, d): """https://maven.apache.org/pom.html#Repositories >>> import rdflib >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... codehausSnapshots ... Codehaus Snapshots ... http://snapshots.maven.codehaus.org/maven2 ... default ... ... ... ''') >>> MavenMapping().parse_repositories(rdflib.Graph(), rdflib.BNode(), d) """ repositories = d.get("repositories") if not repositories: self.parse_repository(graph, root, d, self._default_repository) elif isinstance(repositories, dict): repositories = repositories.get("repository") or [] if not isinstance(repositories, list): repositories = [repositories] for repo in repositories: self.parse_repository(graph, root, d, repo) def parse_repository(self, graph: Graph, root, d, repo): if not isinstance(repo, dict): return if repo.get("layout", "default") != "default": return # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") if ( isinstance(url, str) and isinstance(group_id, str) and isinstance(artifact_id, str) ): repo = os.path.join(url, *group_id.split("."), artifact_id) if "${" in repo: # Often use as templating in pom.xml files collected from VCSs return - graph.add((root, SCHEMA.codeRepository, URIRef(repo))) + add_url_if_valid(graph, root, SCHEMA.codeRepository, repo) def normalize_groupId(self, id_): """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') rdflib.term.Literal('org.example') """ if isinstance(id_, str): return Literal(id_) def translate_licenses(self, graph, root, licenses): """https://maven.apache.org/pom.html#Licenses >>> import xmltodict >>> import json + >>> from rdflib import URIRef >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... ''') >>> print(json.dumps(d, indent=4)) { "licenses": { "license": { "name": "Apache License, Version 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.txt" } } } >>> graph = Graph() >>> root = URIRef("http://example.org/test-software") >>> MavenMapping().translate_licenses(graph, root, d["licenses"]) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/license": { "@id": "https://www.apache.org/licenses/LICENSE-2.0.txt" } } or, if there are more than one license: >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... MIT License ... https://opensource.org/licenses/MIT ... ... ... ''') >>> graph = Graph() >>> root = URIRef("http://example.org/test-software") >>> MavenMapping().translate_licenses(graph, root, d["licenses"]) >>> pprint(set(graph.triples((root, URIRef("http://schema.org/license"), None)))) {(rdflib.term.URIRef('http://example.org/test-software'), rdflib.term.URIRef('http://schema.org/license'), rdflib.term.URIRef('https://opensource.org/licenses/MIT')), (rdflib.term.URIRef('http://example.org/test-software'), rdflib.term.URIRef('http://schema.org/license'), rdflib.term.URIRef('https://www.apache.org/licenses/LICENSE-2.0.txt'))} """ if not isinstance(licenses, dict): return licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): return for license in licenses: - if isinstance(license, dict) and isinstance(license.get("url"), str): - graph.add((root, SCHEMA.license, URIRef(license["url"]))) + if isinstance(license, dict): + add_url_if_valid(graph, root, SCHEMA.license, license.get("url")) diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py index fd627b7..b838e5a 100644 --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -1,298 +1,288 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re -import urllib.parse from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.namespaces import SCHEMA from .base import JsonMapping, SingleFileIntrinsicMapping -from .utils import add_list, prettyprint_graph # noqa +from .utils import add_list, add_url_if_valid, prettyprint_graph # noqa SPDX = URIRef("https://spdx.org/licenses/") class NpmMapping(JsonMapping, SingleFileIntrinsicMapping): """ dedicated class for NPM (package.json) mapping and translation """ name = "npm" mapping = CROSSWALK_TABLE["NodeJS"] filename = b"package.json" string_fields = ["name", "version", "description", "email"] uri_fields = ["homepage"] _schema_shortcuts = { "github": "git+https://github.com/%s.git", "gist": "git+https://gist.github.com/%s.git", "gitlab": "git+https://gitlab.com/%s.git", # Bitbucket supports both hg and git, and the shortcut does not # tell which one to use. # 'bitbucket': 'https://bitbucket.org/', } def normalize_repository(self, d): """https://docs.npmjs.com/files/package.json#repository >>> NpmMapping().normalize_repository({ ... 'type': 'git', ... 'url': 'https://example.org/foo.git' ... }) rdflib.term.URIRef('git+https://example.org/foo.git') >>> NpmMapping().normalize_repository( ... 'gitlab:foo/bar') rdflib.term.URIRef('git+https://gitlab.com/foo/bar.git') >>> NpmMapping().normalize_repository( ... 'foo/bar') rdflib.term.URIRef('git+https://github.com/foo/bar.git') """ if ( isinstance(d, dict) and isinstance(d.get("type"), str) and isinstance(d.get("url"), str) ): url = "{type}+{url}".format(**d) elif isinstance(d, str): if "://" in d: url = d elif ":" in d: (schema, rest) = d.split(":", 1) if schema in self._schema_shortcuts: url = self._schema_shortcuts[schema] % rest else: return None else: url = self._schema_shortcuts["github"] % d else: return None return URIRef(url) def normalize_bugs(self, d): """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ ... 'url': 'https://example.org/bugs/', ... 'email': 'bugs@example.org' ... }) rdflib.term.URIRef('https://example.org/bugs/') >>> NpmMapping().normalize_bugs( ... 'https://example.org/bugs/') rdflib.term.URIRef('https://example.org/bugs/') """ if isinstance(d, dict) and isinstance(d.get("url"), str): url = d["url"] elif isinstance(d, str): url = d else: url = "" - parsed_url = urllib.parse.urlparse(url) - if parsed_url.netloc: - return URIRef(url) - else: - return None + return URIRef(url) _parse_author = re.compile( r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$" ) def translate_author(self, graph: Graph, root, d): r"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint >>> root = URIRef("http://example.org/test-software") >>> graph = Graph() >>> NpmMapping().translate_author(graph, root, { ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https://example.org/~john.doe', ... }) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/author": { "@list": [ { "@type": "http://schema.org/Person", "http://schema.org/email": "john.doe@example.org", "http://schema.org/name": "John Doe", "http://schema.org/url": { "@id": "https://example.org/~john.doe" } } ] } } >>> graph = Graph() >>> NpmMapping().translate_author(graph, root, ... 'John Doe (https://example.org/~john.doe)' ... ) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/author": { "@list": [ { "@type": "http://schema.org/Person", "http://schema.org/email": "john.doe@example.org", "http://schema.org/name": "John Doe", "http://schema.org/url": { "@id": "https://example.org/~john.doe" } } ] } } >>> graph = Graph() >>> NpmMapping().translate_author(graph, root, { ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https:\\\\example.invalid/~john.doe', ... }) >>> prettyprint_graph(graph, root) { "@id": ..., "http://schema.org/author": { "@list": [ { "@type": "http://schema.org/Person", "http://schema.org/email": "john.doe@example.org", "http://schema.org/name": "John Doe" } ] } } """ # noqa author = BNode() graph.add((author, RDF.type, SCHEMA.Person)) if isinstance(d, dict): name = d.get("name", None) email = d.get("email", None) url = d.get("url", None) elif isinstance(d, str): match = self._parse_author.match(d) if not match: return None name = match.group("name") email = match.group("email") url = match.group("url") else: return None if name and isinstance(name, str): graph.add((author, SCHEMA.name, Literal(name))) if email and isinstance(email, str): graph.add((author, SCHEMA.email, Literal(email))) - if url and isinstance(url, str): - # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop - # URLs that are blatantly invalid early, so PyLD does not crash. - parsed_url = urllib.parse.urlparse(url) - if parsed_url.netloc: - graph.add((author, SCHEMA.url, URIRef(url))) + add_url_if_valid(graph, author, SCHEMA.url, url) add_list(graph, root, SCHEMA.author, [author]) def normalize_description(self, description): r"""Try to re-decode ``description`` as UTF-16, as this is a somewhat common mistake that causes issues in the database because of null bytes in JSON. >>> NpmMapping().normalize_description("foo bar") rdflib.term.Literal('foo bar') >>> NpmMapping().normalize_description( ... "\ufffd\ufffd#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 \x00" ... ) rdflib.term.Literal('foo bar') >>> NpmMapping().normalize_description( ... "\ufffd\ufffd\x00#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 " ... ) rdflib.term.Literal('foo bar') >>> NpmMapping().normalize_description( ... # invalid UTF-16 and meaningless UTF-8: ... "\ufffd\ufffd\x00#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description( ... # ditto (ut looks like little-endian at first) ... "\ufffd\ufffd#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description(None) is None True """ if not isinstance(description, str): return None # XXX: if this function ever need to support more cases, consider # switching to https://pypi.org/project/ftfy/ instead of adding more hacks if description.startswith("\ufffd\ufffd") and "\x00" in description: # 2 unicode replacement characters followed by '# ' encoded as UTF-16 # is a common mistake, which indicates a README.md was saved as UTF-16, # and some NPM tool opened it as UTF-8 and used the first line as # description. description_bytes = description.encode() # Strip the the two unicode replacement characters assert description_bytes.startswith(b"\xef\xbf\xbd\xef\xbf\xbd") description_bytes = description_bytes[6:] # If the following attempts fail to recover the description, discard it # entirely because the current indexer storage backend (postgresql) cannot # store zero bytes in JSON columns. description = None if not description_bytes.startswith(b"\x00"): # try UTF-16 little-endian (the most common) first try: description = description_bytes.decode("utf-16le") except UnicodeDecodeError: pass if description is None: # if it fails, try UTF-16 big-endian try: description = description_bytes.decode("utf-16be") except UnicodeDecodeError: pass if description: if description.startswith("# "): description = description[2:] return Literal(description.rstrip()) else: return None return Literal(description) def normalize_license(self, s): """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') rdflib.term.URIRef('https://spdx.org/licenses/MIT') """ if isinstance(s, str): if s.startswith("SEE LICENSE IN "): # Very common pattern, because it is an example in the specification. # It is followed by the filename; and the indexer architecture currently # does not allow accessing that from metadata mappings. # (Plus, an hypothetical license mapping would eventually pick it up) return if " " in s: # Either an SPDX expression, or unusable data # TODO: handle it return return SPDX + s def normalize_keywords(self, lst): """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) [rdflib.term.Literal('foo'), rdflib.term.Literal('bar')] """ if isinstance(lst, list): return [Literal(x) for x in lst if isinstance(x, str)] diff --git a/swh/indexer/metadata_dictionary/nuget.py b/swh/indexer/metadata_dictionary/nuget.py index 087ec0e..b22100c 100644 --- a/swh/indexer/metadata_dictionary/nuget.py +++ b/swh/indexer/metadata_dictionary/nuget.py @@ -1,95 +1,95 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import re from typing import Any, Dict, List from rdflib import RDF, BNode, Graph, Literal, URIRef from swh.indexer.codemeta import _DATA_DIR, read_crosstable from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 from .base import BaseIntrinsicMapping, DirectoryLsEntry, XmlMapping -from .utils import add_list +from .utils import add_list, add_url_if_valid NUGET_TABLE_PATH = os.path.join(_DATA_DIR, "nuget.csv") with open(NUGET_TABLE_PATH) as fd: (CODEMETA_TERMS, NUGET_TABLE) = read_crosstable(fd) SPDX = URIRef("https://spdx.org/licenses/") class NuGetMapping(XmlMapping, BaseIntrinsicMapping): """ dedicated class for NuGet (.nuspec) mapping and translation """ name = "nuget" mapping = NUGET_TABLE["NuGet"] mapping["copyright"] = URIRef("http://schema.org/copyrightNotice") mapping["language"] = URIRef("http://schema.org/inLanguage") string_fields = [ "description", "version", "name", "tags", "license", "summary", "copyright", "language", ] uri_fields = ["projectUrl", "licenseUrl"] @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".nuspec"): return [entry["sha1"]] return [] def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]: return super()._translate_dict(d.get("package", {}).get("metadata", {})) def translate_repository(self, graph, root, v): if isinstance(v, dict) and isinstance(v["@url"], str): codemeta_key = URIRef(self.mapping["repository.url"]) - graph.add((root, codemeta_key, URIRef(v["@url"]))) + add_url_if_valid(graph, root, codemeta_key, v["@url"]) def normalize_license(self, v): if isinstance(v, dict) and v["@type"] == "expression": license_string = v["#text"] if not bool( re.search(r" with |\(|\)| and ", license_string, re.IGNORECASE) ): return [ SPDX + license_type.strip() for license_type in re.split( r" or ", license_string, flags=re.IGNORECASE ) ] else: return None def translate_authors(self, graph: Graph, root, s): if isinstance(s, str): authors = [] for author_name in s.split(","): author_name = author_name.strip() author = BNode() graph.add((author, RDF.type, SCHEMA.Person)) graph.add((author, SCHEMA.name, Literal(author_name))) authors.append(author) add_list(graph, root, SCHEMA.author, authors) def translate_releaseNotes(self, graph: Graph, root, s): if isinstance(s, str): graph.add((root, SCHEMA.releaseNotes, Literal(s))) def normalize_tags(self, s): if isinstance(s, str): return [Literal(tag) for tag in s.split(" ")] diff --git a/swh/indexer/metadata_dictionary/utils.py b/swh/indexer/metadata_dictionary/utils.py index 173b146..8a5fdb9 100644 --- a/swh/indexer/metadata_dictionary/utils.py +++ b/swh/indexer/metadata_dictionary/utils.py @@ -1,72 +1,112 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json -from typing import Callable, Iterable, Optional, Sequence, TypeVar +from typing import Any, Callable, Iterable, Optional, Sequence, TypeVar +import urllib.parse from pyld import jsonld from rdflib import RDF, Graph, URIRef import rdflib.term from swh.indexer.codemeta import _document_loader def prettyprint_graph(graph: Graph, root: URIRef): s = graph.serialize(format="application/ld+json") jsonld_graph = json.loads(s) translated_metadata = jsonld.frame( jsonld_graph, {"@id": str(root)}, options={ "documentLoader": _document_loader, "processingMode": "json-ld-1.1", }, ) print(json.dumps(translated_metadata, indent=4)) def add_list( graph: Graph, subject: rdflib.term.Node, predicate: rdflib.term.Identifier, objects: Sequence[rdflib.term.Node], ) -> None: """Adds triples to the ``graph`` so that they are equivalent to this JSON-LD object:: { "@id": subject, predicate: {"@list": objects} } This is a naive implementation of https://json-ld.org/spec/latest/json-ld-api/#list-to-rdf-conversion """ # JSON-LD's @list is syntactic sugar for a linked list / chain in the RDF graph, # which is what we are going to construct, starting from the end: last_link: rdflib.term.Node last_link = RDF.nil for item in reversed(objects): link = rdflib.BNode() graph.add((link, RDF.first, item)) graph.add((link, RDF.rest, last_link)) last_link = link graph.add((subject, predicate, last_link)) TValue = TypeVar("TValue") def add_map( graph: Graph, subject: rdflib.term.Node, predicate: rdflib.term.Identifier, f: Callable[[Graph, TValue], Optional[rdflib.term.Node]], values: Iterable[TValue], ) -> None: """Helper for :func:`add_list` that takes a mapper function ``f``.""" nodes = [f(graph, value) for value in values] add_list(graph, subject, predicate, [node for node in nodes if node]) + + +def add_url_if_valid( + graph: Graph, + subject: rdflib.term.Node, + predicate: rdflib.term.Identifier, + url: Any, +) -> None: + """Adds ``(subject, predicate, url)`` to the graph if ``url`` is well-formed. + + This is meant as a workaround for https://github.com/digitalbazaar/pyld/issues/91 + to drop URLs that are blatantly invalid early, so PyLD does not crash. + + >>> from pprint import pprint + >>> graph = Graph() + >>> subject = rdflib.term.URIRef("http://example.org/test-software") + >>> predicate = rdflib.term.URIRef("http://schema.org/license") + >>> add_url_if_valid( + ... graph, subject, predicate, "https//www.apache.org/licenses/LICENSE-2.0.txt" + ... ) + >>> add_url_if_valid( + ... graph, subject, predicate, "http:s//www.apache.org/licenses/LICENSE-2.0.txt" + ... ) + >>> add_url_if_valid( + ... graph, subject, predicate, "https://www.apache.org/licenses/LICENSE-2.0.txt" + ... ) + >>> add_url_if_valid( + ... graph, subject, predicate, 42 + ... ) + >>> pprint(set(graph.triples((subject, predicate, None)))) + {(rdflib.term.URIRef('http://example.org/test-software'), + rdflib.term.URIRef('http://schema.org/license'), + rdflib.term.URIRef('https://www.apache.org/licenses/LICENSE-2.0.txt'))} + """ + if not isinstance(url, str): + return + if " " in url or not urllib.parse.urlparse(url).netloc: + return + graph.add((subject, predicate, rdflib.term.URIRef(url))) diff --git a/swh/indexer/tests/metadata_dictionary/test_npm.py b/swh/indexer/tests/metadata_dictionary/test_npm.py index 804ac64..9b52bfd 100644 --- a/swh/indexer/tests/metadata_dictionary/test_npm.py +++ b/swh/indexer/tests/metadata_dictionary/test_npm.py @@ -1,438 +1,449 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from hypothesis import HealthCheck, given, settings import pytest from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.storage.model import ContentMetadataRow from ..test_metadata import TRANSLATOR_TOOL, ContentMetadataTestIndexer from ..utils import ( BASE_TEST_CONFIG, MAPPING_DESCRIPTION_CONTENT_SHA1, json_document_strategy, ) def test_compute_metadata_none(): """ testing content empty content is empty should return None """ content = b"" # None if no metadata was found or an error occurred declared_metadata = None result = MAPPINGS["NpmMapping"]().translate(content) assert declared_metadata == result def test_compute_metadata_npm(): """ testing only computation of metadata with hard_mapping_npm """ content = b""" { "name": "test_metadata", "version": "0.0.2", "description": "Simple package.json test for indexer", "repository": { "type": "git", "url": "https://github.com/moranegg/metadata_test" }, "author": { "email": "moranegg@example.com", "name": "Morane G" } } """ declared_metadata = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "name": "test_metadata", "version": "0.0.2", "description": "Simple package.json test for indexer", "codeRepository": "git+https://github.com/moranegg/metadata_test", "author": [ { "type": "Person", "name": "Morane G", "email": "moranegg@example.com", } ], } result = MAPPINGS["NpmMapping"]().translate(content) assert declared_metadata == result def test_compute_metadata_invalid_description_npm(): """ testing only computation of metadata with hard_mapping_npm """ content = b""" { "name": "test_metadata", "version": "0.0.2", "description": 1234 } """ declared_metadata = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "name": "test_metadata", "version": "0.0.2", } result = MAPPINGS["NpmMapping"]().translate(content) assert declared_metadata == result def test_index_content_metadata_npm(storage, obj_storage): """ testing NPM with package.json - one sha1 uses a file that can't be translated to metadata and should return None in the translated metadata """ sha1s = [ MAPPING_DESCRIPTION_CONTENT_SHA1["json:test-metadata-package.json"], MAPPING_DESCRIPTION_CONTENT_SHA1["json:npm-package.json"], MAPPING_DESCRIPTION_CONTENT_SHA1["python:code"], ] # this metadata indexer computes only metadata for package.json # in npm context with a hard mapping config = BASE_TEST_CONFIG.copy() config["tools"] = [TRANSLATOR_TOOL] metadata_indexer = ContentMetadataTestIndexer(config=config) metadata_indexer.run(sha1s, log_suffix="unknown content") results = list(metadata_indexer.idx_storage.content_metadata_get(sha1s)) expected_results = [ ContentMetadataRow( id=sha1s[0], tool=TRANSLATOR_TOOL, metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "codeRepository": "git+https://github.com/moranegg/metadata_test", "description": "Simple package.json test for indexer", "name": "test_metadata", "version": "0.0.1", }, ), ContentMetadataRow( id=sha1s[1], tool=TRANSLATOR_TOOL, metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "issueTracker": "https://github.com/npm/npm/issues", "author": [ { "type": "Person", "name": "Isaac Z. Schlueter", "email": "i@izs.me", "url": "http://blog.izs.me", } ], "codeRepository": "git+https://github.com/npm/npm", "description": "a package manager for JavaScript", "license": "https://spdx.org/licenses/Artistic-2.0", "version": "5.0.3", "name": "npm", "url": "https://docs.npmjs.com/", }, ), ] for result in results: del result.tool["id"] result.metadata.pop("keywords", None) # The assertion below returns False sometimes because of nested lists assert expected_results == results def test_npm_null_list_item_normalization(): package_json = b"""{ "name": "foo", "keywords": [ "foo", null ], "homepage": [ "http://example.org/", null ] }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "type": "SoftwareSourceCode", "url": "http://example.org/", "keywords": "foo", } def test_npm_bugs_normalization(): # valid dictionary package_json = b"""{ "name": "foo", "bugs": { "url": "https://github.com/owner/project/issues", "email": "foo@example.com" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "issueTracker": "https://github.com/owner/project/issues", "type": "SoftwareSourceCode", } # "invalid" dictionary package_json = b"""{ "name": "foo", "bugs": { "email": "foo@example.com" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "type": "SoftwareSourceCode", } # string package_json = b"""{ "name": "foo", "bugs": "https://github.com/owner/project/issues" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "issueTracker": "https://github.com/owner/project/issues", "type": "SoftwareSourceCode", } def test_npm_repository_normalization(): # normal package_json = b"""{ "name": "foo", "repository": { "type" : "git", "url" : "https://github.com/npm/cli.git" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "codeRepository": "git+https://github.com/npm/cli.git", "type": "SoftwareSourceCode", } # missing url package_json = b"""{ "name": "foo", "repository": { "type" : "git" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "type": "SoftwareSourceCode", } # github shortcut package_json = b"""{ "name": "foo", "repository": "github:npm/cli" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) expected_result = { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "codeRepository": "git+https://github.com/npm/cli.git", "type": "SoftwareSourceCode", } assert result == expected_result # github shortshortcut package_json = b"""{ "name": "foo", "repository": "npm/cli" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == expected_result # gitlab shortcut package_json = b"""{ "name": "foo", "repository": "gitlab:user/repo" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "name": "foo", "codeRepository": "git+https://gitlab.com/user/repo.git", "type": "SoftwareSourceCode", } def test_npm_author(): package_json = rb"""{ "version": "1.0.0", "author": "Foo Bar (@example)" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "author": [{"name": "Foo Bar", "type": "Person"}], "version": "1.0.0", } def test_npm_invalid_uris(): package_json = rb"""{ "version": "1.0.0", "homepage": "", "author": { "name": "foo", "url": "http://example.org" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "author": [{"name": "foo", "type": "Person", "url": "http://example.org"}], "version": "1.0.0", } package_json = rb"""{ "version": "1.0.0", "homepage": "http://example.org", "author": { "name": "foo", "url": "" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "author": [{"name": "foo", "type": "Person"}], "url": "http://example.org", "version": "1.0.0", } package_json = rb"""{ "version": "1.0.0", "homepage": "", "author": { "name": "foo", "url": "" }, "bugs": "" }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "author": [{"name": "foo", "type": "Person"}], "version": "1.0.0", } package_json = rb"""{ "version": "1.0.0", "homepage": "http:example.org", "author": { "name": "foo", "url": "http:example.com" }, "bugs": { "url": "http:example.com" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "author": [{"name": "foo", "type": "Person"}], "version": "1.0.0", } + package_json = rb"""{ + "version": "1.0.0", + "repository": "git+https://g ithub.com/foo/bar.git" +}""" + result = MAPPINGS["NpmMapping"]().translate(package_json) + assert result == { + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "SoftwareSourceCode", + "version": "1.0.0", + } + def test_npm_invalid_licenses(): package_json = rb"""{ "version": "1.0.0", "license": "SEE LICENSE IN LICENSE.md", "author": { "name": "foo", "url": "http://example.org" } }""" result = MAPPINGS["NpmMapping"]().translate(package_json) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "type": "SoftwareSourceCode", "author": [{"name": "foo", "type": "Person", "url": "http://example.org"}], "version": "1.0.0", } @settings(suppress_health_check=[HealthCheck.too_slow]) @given(json_document_strategy(keys=list(MAPPINGS["NpmMapping"].mapping))) # type: ignore def test_npm_adversarial(doc): raw = json.dumps(doc).encode() MAPPINGS["NpmMapping"]().translate(raw) @pytest.mark.parametrize( "filename", [b"package.json", b"Package.json", b"PACKAGE.json", b"PACKAGE.JSON"] ) def test_detect_metadata_package_json(filename): df = [ { "sha1_git": b"abc", "name": b"index.js", "target": b"abc", "length": 897, "status": "visible", "type": "file", "perms": 33188, "dir_id": b"dir_a", "sha1": b"bcd", }, { "sha1_git": b"aab", "name": filename, "target": b"aab", "length": 712, "status": "visible", "type": "file", "perms": 33188, "dir_id": b"dir_a", "sha1": b"cde", }, ] results = detect_metadata(df) expected_results = {"NpmMapping": [b"cde"]} assert expected_results == results