diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index b4e781a..7d63e0e 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,405 +1,404 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
-import urllib.parse
import uuid
import xml.parsers.expat
from pyld import jsonld
import rdflib
from typing_extensions import TypedDict
import xmltodict
import yaml
from swh.indexer.codemeta import _document_loader, compact
from swh.indexer.namespaces import RDF, SCHEMA
from swh.indexer.storage.interface import Sha1
+from .utils import add_url_if_valid
+
TMP_ROOT_URI_PREFIX = "https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
"""Prefix used to generate temporary URIs for root nodes being translated."""
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Sha1
name: bytes
type: str
TTranslateCallable = TypeVar(
"TTranslateCallable",
bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None],
)
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
"""Returns a decorator that marks the decorated function as adding
the given terms to the ``translated_metadata`` dict"""
def decorator(f: TTranslateCallable) -> TTranslateCallable:
if not hasattr(f, "produced_terms"):
f.produced_terms = [] # type: ignore
f.produced_terms.extend(uris) # type: ignore
return f
return decorator
class BaseMapping:
"""Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
not to be inherited directly."""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
mapping-specific data and translating with the appropriate mapping
to JSON-LD using the Codemeta and ForgeFed vocabularies.
Args:
raw_content: raw content to translate
Returns:
translated metadata in JSON friendly form needed for the content
if parseable, :const:`None` otherwise.
"""
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
class BaseExtrinsicMapping(BaseMapping):
"""Base class for extrinsic_metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
"""
Returns the list of extrinsic metadata formats which can be translated
by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=True)
class BaseIntrinsicMapping(BaseMapping):
"""Base class for intrinsic-metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
"""
Returns the sha1 hashes of files which can be translated by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
"""Base class for all intrinsic metadata mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
date_fields: List[str] = []
"""List of fields that are strings that should be typed as http://schema.org/Date
"""
uri_fields: List[str] = []
"""List of fields that are simple URIs, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
# one-to-one mapping from the original key to a CodeMeta term
simple_terms = {
str(term)
for (key, term) in cls.mapping.items()
if key in cls.string_fields + cls.date_fields + cls.uri_fields
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
# more complex mapping from the original key to JSON-LD
complex_terms = {
str(term)
for meth_name in dir(cls)
if meth_name.startswith("translate_")
for term in getattr(getattr(cls, meth_name), "produced_terms", [])
}
return simple_terms | complex_terms
def get_root_uri(self, content_dict: Dict) -> rdflib.URIRef:
"""Returns an URI for the SoftwareSourceCode or Repository being described.
The default implementation uses a temporary URI that is stripped before
normalization by :meth:`_translate_dict`.
"""
# The main object being described (the SoftwareSourceCode) does not necessarily
# may or may not have an id.
# If it does, it will need to be set by a subclass.
# If it doesn't we temporarily use this URI to identify it. Unfortunately,
# we cannot use a blank node as we need to use it for JSON-LD framing later,
# and blank nodes cannot be used for framing in JSON-LD >= 1.1
root_id = TMP_ROOT_URI_PREFIX + str(uuid.uuid4())
return rdflib.URIRef(root_id)
def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
graph = rdflib.Graph()
root = self.get_root_uri(content_dict)
self._translate_to_graph(graph, root, content_dict)
self.sanitize(graph)
# Convert from rdflib's internal graph representation to JSON
s = graph.serialize(format="application/ld+json")
# Load from JSON to a list of Python objects
jsonld_graph = json.loads(s)
# Use JSON-LD framing to turn the graph into a rooted tree
# frame = {"@type": str(SCHEMA.SoftwareSourceCode)}
translated_metadata = jsonld.frame(
jsonld_graph,
{"@id": str(root)},
options={
"documentLoader": _document_loader,
"processingMode": "json-ld-1.1",
},
)
# Remove the temporary id we added at the beginning
assert isinstance(translated_metadata["@id"], str)
if translated_metadata["@id"].startswith(TMP_ROOT_URI_PREFIX):
del translated_metadata["@id"]
return self.normalize_translation(translated_metadata)
def _translate_to_graph(
self, graph: rdflib.Graph, root: rdflib.term.Identifier, content_dict: Dict
) -> None:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping to the graph passed as parameter
Args:
content_dict (dict): content dict to translate
"""
graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode))
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(graph, root, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value,
# and add its results to the triples
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
if v is None:
pass
elif isinstance(v, list):
for item in reversed(v):
- graph.add((root, codemeta_key, item))
+ if isinstance(item, rdflib.URIRef):
+ add_url_if_valid(graph, root, codemeta_key, str(item))
+ else:
+ graph.add((root, codemeta_key, item))
else:
- graph.add((root, codemeta_key, v))
+ if isinstance(v, rdflib.URIRef):
+ add_url_if_valid(graph, root, codemeta_key, str(v))
+ else:
+ graph.add((root, codemeta_key, v))
elif k in self.string_fields and isinstance(v, str):
graph.add((root, codemeta_key, rdflib.Literal(v)))
elif k in self.string_fields and isinstance(v, list):
for item in v:
graph.add((root, codemeta_key, rdflib.Literal(item)))
elif k in self.date_fields and isinstance(v, str):
typed_v = rdflib.Literal(v, datatype=SCHEMA.Date)
graph.add((root, codemeta_key, typed_v))
elif k in self.date_fields and isinstance(v, list):
for item in v:
if isinstance(item, str):
typed_item = rdflib.Literal(item, datatype=SCHEMA.Date)
graph.add((root, codemeta_key, typed_item))
elif k in self.uri_fields and isinstance(v, str):
- # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop
- # URLs that are blatantly invalid early, so PyLD does not crash.
- parsed_url = urllib.parse.urlparse(v)
- if parsed_url.netloc:
- graph.add((root, codemeta_key, rdflib.URIRef(v)))
+ add_url_if_valid(graph, root, codemeta_key, v)
elif k in self.uri_fields and isinstance(v, list):
for item in v:
- if isinstance(item, str):
- # ditto
- parsed_url = urllib.parse.urlparse(item)
- if parsed_url.netloc:
- graph.add((root, codemeta_key, rdflib.URIRef(item)))
+ add_url_if_valid(graph, root, codemeta_key, item)
else:
continue
self.extra_translation(graph, root, content_dict)
def sanitize(self, graph: rdflib.Graph) -> None:
# Remove triples that make PyLD crash
for (subject, predicate, _) in graph.triples((None, None, rdflib.URIRef(""))):
graph.remove((subject, predicate, rdflib.URIRef("")))
# Should not happen, but we's better check as this may lead to incorrect data
invalid = False
for triple in graph.triples((rdflib.URIRef(""), None, None)):
invalid = True
logging.error("Empty triple subject URI: %r", triple)
if invalid:
raise ValueError("Empty triple subject(s)")
def extra_translation(
self, graph: rdflib.Graph, root: rdflib.term.Node, d: Dict[str, Any]
) -> None:
"""Called at the end of the translation process, and may add arbitrary triples
to ``graph`` based on the input dictionary (passed as ``d``).
"""
pass
class JsonMapping(DictMapping):
"""Base class for all mappings that use JSON data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
class XmlMapping(DictMapping):
"""Base class for all mappings that use XML data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
d = xmltodict.parse(raw_content)
except xml.parsers.expat.ExpatError:
self.log.warning("Error parsing XML from %s", self.log_suffix)
return None
except UnicodeDecodeError:
self.log.warning("Error unidecoding XML from %s", self.log_suffix)
return None
except (LookupError, ValueError):
# unknown encoding or multi-byte encoding
self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
return None
if not isinstance(d, dict):
self.log.warning("Skipping ill-formed XML content: %s", raw_content)
return None
return self._translate_dict(d)
class SafeLoader(yaml.SafeLoader):
yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
class YamlMapping(DictMapping, SingleFileIntrinsicMapping):
"""Base class for all mappings that use Yaml data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
raw_content_string: str = raw_content.decode()
try:
content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
except yaml.scanner.ScannerError:
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py
index 8ced23e..78ba661 100644
--- a/swh/indexer/metadata_dictionary/cff.py
+++ b/swh/indexer/metadata_dictionary/cff.py
@@ -1,60 +1,65 @@
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import List
+import urllib.parse
from rdflib import BNode, Graph, Literal, URIRef
import rdflib.term
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import RDF, SCHEMA
from .base import YamlMapping
from .utils import add_map
DOI = URIRef("https://doi.org/")
SPDX = URIRef("https://spdx.org/licenses/")
class CffMapping(YamlMapping):
"""Dedicated class for Citation (CITATION.cff) mapping and translation"""
name = "cff"
filename = b"CITATION.cff"
mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"]
string_fields = ["keywords", "license", "abstract", "version", "doi"]
date_fields = ["date-released"]
uri_fields = ["repository-code"]
def _translate_author(self, graph: Graph, author: dict) -> rdflib.term.Node:
node: rdflib.term.Node
- if "orcid" in author and isinstance(author["orcid"], str):
+ if (
+ "orcid" in author
+ and isinstance(author["orcid"], str)
+ and urllib.parse.urlparse(author["orcid"]).netloc
+ ):
node = URIRef(author["orcid"])
else:
node = BNode()
graph.add((node, RDF.type, SCHEMA.Person))
if "affiliation" in author and isinstance(author["affiliation"], str):
affiliation = BNode()
graph.add((node, SCHEMA.affiliation, affiliation))
graph.add((affiliation, RDF.type, SCHEMA.Organization))
graph.add((affiliation, SCHEMA.name, Literal(author["affiliation"])))
if "family-names" in author and isinstance(author["family-names"], str):
graph.add((node, SCHEMA.familyName, Literal(author["family-names"])))
if "given-names" in author and isinstance(author["given-names"], str):
graph.add((node, SCHEMA.givenName, Literal(author["given-names"])))
return node
def translate_authors(
self, graph: Graph, root: URIRef, authors: List[dict]
) -> None:
add_map(graph, root, SCHEMA.author, self._translate_author, authors)
def normalize_doi(self, s: str) -> URIRef:
if isinstance(s, str):
return DOI + s
def normalize_license(self, s: str) -> URIRef:
if isinstance(s, str):
return SPDX + s
diff --git a/swh/indexer/metadata_dictionary/github.py b/swh/indexer/metadata_dictionary/github.py
index 25f6eff..0435c41 100644
--- a/swh/indexer/metadata_dictionary/github.py
+++ b/swh/indexer/metadata_dictionary/github.py
@@ -1,137 +1,136 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Any, Tuple
from rdflib import RDF, BNode, Graph, Literal, URIRef
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import ACTIVITYSTREAMS, CODEMETA, FORGEFED, SCHEMA
from .base import BaseExtrinsicMapping, JsonMapping, produce_terms
-from .utils import prettyprint_graph # noqa
+from .utils import add_url_if_valid, prettyprint_graph # noqa
SPDX = URIRef("https://spdx.org/licenses/")
class GitHubMapping(BaseExtrinsicMapping, JsonMapping):
name = "github"
mapping = {
**CROSSWALK_TABLE["GitHub"],
"topics": SCHEMA.keywords, # TODO: submit this to the official crosswalk
"clone_url": SCHEMA.codeRepository,
}
uri_fields = [
"clone_url",
]
date_fields = [
"created_at",
"updated_at",
]
string_fields = [
"description",
"full_name",
"topics",
]
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
return ("application/vnd.github.v3+json",)
def extra_translation(self, graph, root, content_dict):
graph.remove((root, RDF.type, SCHEMA.SoftwareSourceCode))
graph.add((root, RDF.type, FORGEFED.Repository))
if content_dict.get("has_issues"):
- graph.add(
- (
- root,
- CODEMETA.issueTracker,
- URIRef(content_dict["html_url"] + "/issues"),
- )
+ add_url_if_valid(
+ graph,
+ root,
+ CODEMETA.issueTracker,
+ URIRef(content_dict["html_url"] + "/issues"),
)
def get_root_uri(self, content_dict: dict) -> URIRef:
if isinstance(content_dict.get("html_url"), str):
return URIRef(content_dict["html_url"])
else:
raise ValueError(
f"GitHub metadata has missing/invalid html_url: {content_dict}"
)
@produce_terms(FORGEFED.forks, ACTIVITYSTREAMS.totalItems)
def translate_forks_count(self, graph: Graph, root: BNode, v: Any) -> None:
"""
>>> graph = Graph()
>>> root = URIRef("http://example.org/test-software")
>>> GitHubMapping().translate_forks_count(graph, root, 42)
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"https://forgefed.org/ns#forks": {
"@type": "https://www.w3.org/ns/activitystreams#OrderedCollection",
"https://www.w3.org/ns/activitystreams#totalItems": 42
}
}
"""
if isinstance(v, int):
collection = BNode()
graph.add((root, FORGEFED.forks, collection))
graph.add((collection, RDF.type, ACTIVITYSTREAMS.OrderedCollection))
graph.add((collection, ACTIVITYSTREAMS.totalItems, Literal(v)))
@produce_terms(ACTIVITYSTREAMS.likes, ACTIVITYSTREAMS.totalItems)
def translate_stargazers_count(self, graph: Graph, root: BNode, v: Any) -> None:
"""
>>> graph = Graph()
>>> root = URIRef("http://example.org/test-software")
>>> GitHubMapping().translate_stargazers_count(graph, root, 42)
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"https://www.w3.org/ns/activitystreams#likes": {
"@type": "https://www.w3.org/ns/activitystreams#Collection",
"https://www.w3.org/ns/activitystreams#totalItems": 42
}
}
"""
if isinstance(v, int):
collection = BNode()
graph.add((root, ACTIVITYSTREAMS.likes, collection))
graph.add((collection, RDF.type, ACTIVITYSTREAMS.Collection))
graph.add((collection, ACTIVITYSTREAMS.totalItems, Literal(v)))
@produce_terms(ACTIVITYSTREAMS.followers, ACTIVITYSTREAMS.totalItems)
def translate_watchers_count(self, graph: Graph, root: BNode, v: Any) -> None:
"""
>>> graph = Graph()
>>> root = URIRef("http://example.org/test-software")
>>> GitHubMapping().translate_watchers_count(graph, root, 42)
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"https://www.w3.org/ns/activitystreams#followers": {
"@type": "https://www.w3.org/ns/activitystreams#Collection",
"https://www.w3.org/ns/activitystreams#totalItems": 42
}
}
"""
if isinstance(v, int):
collection = BNode()
graph.add((root, ACTIVITYSTREAMS.followers, collection))
graph.add((collection, RDF.type, ACTIVITYSTREAMS.Collection))
graph.add((collection, ACTIVITYSTREAMS.totalItems, Literal(v)))
def normalize_license(self, d):
"""
>>> GitHubMapping().normalize_license({'spdx_id': 'MIT'})
rdflib.term.URIRef('https://spdx.org/licenses/MIT')
"""
if isinstance(d, dict) and isinstance(d.get("spdx_id"), str):
return SPDX + d["spdx_id"]
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
index 8b3e48d..5575ba9 100644
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -1,162 +1,163 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict
-from rdflib import Graph, Literal, URIRef
+from rdflib import Graph, Literal
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import SCHEMA
from .base import SingleFileIntrinsicMapping, XmlMapping
-from .utils import prettyprint_graph # noqa
+from .utils import add_url_if_valid, prettyprint_graph # noqa
class MavenMapping(XmlMapping, SingleFileIntrinsicMapping):
"""
dedicated class for Maven (pom.xml) mapping and translation
"""
name = "maven"
filename = b"pom.xml"
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]:
return super()._translate_dict(d.get("project") or {})
def extra_translation(self, graph: Graph, root, d):
self.parse_repositories(graph, root, d)
def parse_repositories(self, graph: Graph, root, d):
"""https://maven.apache.org/pom.html#Repositories
>>> import rdflib
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... codehausSnapshots
... Codehaus Snapshots
... http://snapshots.maven.codehaus.org/maven2
... default
...
...
... ''')
>>> MavenMapping().parse_repositories(rdflib.Graph(), rdflib.BNode(), d)
"""
repositories = d.get("repositories")
if not repositories:
self.parse_repository(graph, root, d, self._default_repository)
elif isinstance(repositories, dict):
repositories = repositories.get("repository") or []
if not isinstance(repositories, list):
repositories = [repositories]
for repo in repositories:
self.parse_repository(graph, root, d, repo)
def parse_repository(self, graph: Graph, root, d, repo):
if not isinstance(repo, dict):
return
if repo.get("layout", "default") != "default":
return # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
if (
isinstance(url, str)
and isinstance(group_id, str)
and isinstance(artifact_id, str)
):
repo = os.path.join(url, *group_id.split("."), artifact_id)
if "${" in repo:
# Often use as templating in pom.xml files collected from VCSs
return
- graph.add((root, SCHEMA.codeRepository, URIRef(repo)))
+ add_url_if_valid(graph, root, SCHEMA.codeRepository, repo)
def normalize_groupId(self, id_):
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
rdflib.term.Literal('org.example')
"""
if isinstance(id_, str):
return Literal(id_)
def translate_licenses(self, graph, root, licenses):
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
>>> import json
+ >>> from rdflib import URIRef
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... ''')
>>> print(json.dumps(d, indent=4))
{
"licenses": {
"license": {
"name": "Apache License, Version 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
}
>>> graph = Graph()
>>> root = URIRef("http://example.org/test-software")
>>> MavenMapping().translate_licenses(graph, root, d["licenses"])
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"http://schema.org/license": {
"@id": "https://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
or, if there are more than one license:
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... MIT License
... https://opensource.org/licenses/MIT
...
...
... ''')
>>> graph = Graph()
>>> root = URIRef("http://example.org/test-software")
>>> MavenMapping().translate_licenses(graph, root, d["licenses"])
>>> pprint(set(graph.triples((root, URIRef("http://schema.org/license"), None))))
{(rdflib.term.URIRef('http://example.org/test-software'),
rdflib.term.URIRef('http://schema.org/license'),
rdflib.term.URIRef('https://opensource.org/licenses/MIT')),
(rdflib.term.URIRef('http://example.org/test-software'),
rdflib.term.URIRef('http://schema.org/license'),
rdflib.term.URIRef('https://www.apache.org/licenses/LICENSE-2.0.txt'))}
"""
if not isinstance(licenses, dict):
return
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
return
for license in licenses:
- if isinstance(license, dict) and isinstance(license.get("url"), str):
- graph.add((root, SCHEMA.license, URIRef(license["url"])))
+ if isinstance(license, dict):
+ add_url_if_valid(graph, root, SCHEMA.license, license.get("url"))
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
index fd627b7..b838e5a 100644
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -1,298 +1,288 @@
# Copyright (C) 2018-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
-import urllib.parse
from rdflib import RDF, BNode, Graph, Literal, URIRef
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import SCHEMA
from .base import JsonMapping, SingleFileIntrinsicMapping
-from .utils import add_list, prettyprint_graph # noqa
+from .utils import add_list, add_url_if_valid, prettyprint_graph # noqa
SPDX = URIRef("https://spdx.org/licenses/")
class NpmMapping(JsonMapping, SingleFileIntrinsicMapping):
"""
dedicated class for NPM (package.json) mapping and translation
"""
name = "npm"
mapping = CROSSWALK_TABLE["NodeJS"]
filename = b"package.json"
string_fields = ["name", "version", "description", "email"]
uri_fields = ["homepage"]
_schema_shortcuts = {
"github": "git+https://github.com/%s.git",
"gist": "git+https://gist.github.com/%s.git",
"gitlab": "git+https://gitlab.com/%s.git",
# Bitbucket supports both hg and git, and the shortcut does not
# tell which one to use.
# 'bitbucket': 'https://bitbucket.org/',
}
def normalize_repository(self, d):
"""https://docs.npmjs.com/files/package.json#repository
>>> NpmMapping().normalize_repository({
... 'type': 'git',
... 'url': 'https://example.org/foo.git'
... })
rdflib.term.URIRef('git+https://example.org/foo.git')
>>> NpmMapping().normalize_repository(
... 'gitlab:foo/bar')
rdflib.term.URIRef('git+https://gitlab.com/foo/bar.git')
>>> NpmMapping().normalize_repository(
... 'foo/bar')
rdflib.term.URIRef('git+https://github.com/foo/bar.git')
"""
if (
isinstance(d, dict)
and isinstance(d.get("type"), str)
and isinstance(d.get("url"), str)
):
url = "{type}+{url}".format(**d)
elif isinstance(d, str):
if "://" in d:
url = d
elif ":" in d:
(schema, rest) = d.split(":", 1)
if schema in self._schema_shortcuts:
url = self._schema_shortcuts[schema] % rest
else:
return None
else:
url = self._schema_shortcuts["github"] % d
else:
return None
return URIRef(url)
def normalize_bugs(self, d):
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
... 'url': 'https://example.org/bugs/',
... 'email': 'bugs@example.org'
... })
rdflib.term.URIRef('https://example.org/bugs/')
>>> NpmMapping().normalize_bugs(
... 'https://example.org/bugs/')
rdflib.term.URIRef('https://example.org/bugs/')
"""
if isinstance(d, dict) and isinstance(d.get("url"), str):
url = d["url"]
elif isinstance(d, str):
url = d
else:
url = ""
- parsed_url = urllib.parse.urlparse(url)
- if parsed_url.netloc:
- return URIRef(url)
- else:
- return None
+ return URIRef(url)
_parse_author = re.compile(
r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$"
)
def translate_author(self, graph: Graph, root, d):
r"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
>>> root = URIRef("http://example.org/test-software")
>>> graph = Graph()
>>> NpmMapping().translate_author(graph, root, {
... 'name': 'John Doe',
... 'email': 'john.doe@example.org',
... 'url': 'https://example.org/~john.doe',
... })
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"http://schema.org/author": {
"@list": [
{
"@type": "http://schema.org/Person",
"http://schema.org/email": "john.doe@example.org",
"http://schema.org/name": "John Doe",
"http://schema.org/url": {
"@id": "https://example.org/~john.doe"
}
}
]
}
}
>>> graph = Graph()
>>> NpmMapping().translate_author(graph, root,
... 'John Doe (https://example.org/~john.doe)'
... )
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"http://schema.org/author": {
"@list": [
{
"@type": "http://schema.org/Person",
"http://schema.org/email": "john.doe@example.org",
"http://schema.org/name": "John Doe",
"http://schema.org/url": {
"@id": "https://example.org/~john.doe"
}
}
]
}
}
>>> graph = Graph()
>>> NpmMapping().translate_author(graph, root, {
... 'name': 'John Doe',
... 'email': 'john.doe@example.org',
... 'url': 'https:\\\\example.invalid/~john.doe',
... })
>>> prettyprint_graph(graph, root)
{
"@id": ...,
"http://schema.org/author": {
"@list": [
{
"@type": "http://schema.org/Person",
"http://schema.org/email": "john.doe@example.org",
"http://schema.org/name": "John Doe"
}
]
}
}
""" # noqa
author = BNode()
graph.add((author, RDF.type, SCHEMA.Person))
if isinstance(d, dict):
name = d.get("name", None)
email = d.get("email", None)
url = d.get("url", None)
elif isinstance(d, str):
match = self._parse_author.match(d)
if not match:
return None
name = match.group("name")
email = match.group("email")
url = match.group("url")
else:
return None
if name and isinstance(name, str):
graph.add((author, SCHEMA.name, Literal(name)))
if email and isinstance(email, str):
graph.add((author, SCHEMA.email, Literal(email)))
- if url and isinstance(url, str):
- # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop
- # URLs that are blatantly invalid early, so PyLD does not crash.
- parsed_url = urllib.parse.urlparse(url)
- if parsed_url.netloc:
- graph.add((author, SCHEMA.url, URIRef(url)))
+ add_url_if_valid(graph, author, SCHEMA.url, url)
add_list(graph, root, SCHEMA.author, [author])
def normalize_description(self, description):
r"""Try to re-decode ``description`` as UTF-16, as this is a somewhat common
mistake that causes issues in the database because of null bytes in JSON.
>>> NpmMapping().normalize_description("foo bar")
rdflib.term.Literal('foo bar')
>>> NpmMapping().normalize_description(
... "\ufffd\ufffd#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 \x00"
... )
rdflib.term.Literal('foo bar')
>>> NpmMapping().normalize_description(
... "\ufffd\ufffd\x00#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 "
... )
rdflib.term.Literal('foo bar')
>>> NpmMapping().normalize_description(
... # invalid UTF-16 and meaningless UTF-8:
... "\ufffd\ufffd\x00#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00"
... ) is None
True
>>> NpmMapping().normalize_description(
... # ditto (ut looks like little-endian at first)
... "\ufffd\ufffd#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00\x00"
... ) is None
True
>>> NpmMapping().normalize_description(None) is None
True
"""
if not isinstance(description, str):
return None
# XXX: if this function ever need to support more cases, consider
# switching to https://pypi.org/project/ftfy/ instead of adding more hacks
if description.startswith("\ufffd\ufffd") and "\x00" in description:
# 2 unicode replacement characters followed by '# ' encoded as UTF-16
# is a common mistake, which indicates a README.md was saved as UTF-16,
# and some NPM tool opened it as UTF-8 and used the first line as
# description.
description_bytes = description.encode()
# Strip the the two unicode replacement characters
assert description_bytes.startswith(b"\xef\xbf\xbd\xef\xbf\xbd")
description_bytes = description_bytes[6:]
# If the following attempts fail to recover the description, discard it
# entirely because the current indexer storage backend (postgresql) cannot
# store zero bytes in JSON columns.
description = None
if not description_bytes.startswith(b"\x00"):
# try UTF-16 little-endian (the most common) first
try:
description = description_bytes.decode("utf-16le")
except UnicodeDecodeError:
pass
if description is None:
# if it fails, try UTF-16 big-endian
try:
description = description_bytes.decode("utf-16be")
except UnicodeDecodeError:
pass
if description:
if description.startswith("# "):
description = description[2:]
return Literal(description.rstrip())
else:
return None
return Literal(description)
def normalize_license(self, s):
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
rdflib.term.URIRef('https://spdx.org/licenses/MIT')
"""
if isinstance(s, str):
if s.startswith("SEE LICENSE IN "):
# Very common pattern, because it is an example in the specification.
# It is followed by the filename; and the indexer architecture currently
# does not allow accessing that from metadata mappings.
# (Plus, an hypothetical license mapping would eventually pick it up)
return
if " " in s:
# Either an SPDX expression, or unusable data
# TODO: handle it
return
return SPDX + s
def normalize_keywords(self, lst):
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
[rdflib.term.Literal('foo'), rdflib.term.Literal('bar')]
"""
if isinstance(lst, list):
return [Literal(x) for x in lst if isinstance(x, str)]
diff --git a/swh/indexer/metadata_dictionary/nuget.py b/swh/indexer/metadata_dictionary/nuget.py
index 087ec0e..b22100c 100644
--- a/swh/indexer/metadata_dictionary/nuget.py
+++ b/swh/indexer/metadata_dictionary/nuget.py
@@ -1,95 +1,95 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os.path
import re
from typing import Any, Dict, List
from rdflib import RDF, BNode, Graph, Literal, URIRef
from swh.indexer.codemeta import _DATA_DIR, read_crosstable
from swh.indexer.namespaces import SCHEMA
from swh.indexer.storage.interface import Sha1
from .base import BaseIntrinsicMapping, DirectoryLsEntry, XmlMapping
-from .utils import add_list
+from .utils import add_list, add_url_if_valid
NUGET_TABLE_PATH = os.path.join(_DATA_DIR, "nuget.csv")
with open(NUGET_TABLE_PATH) as fd:
(CODEMETA_TERMS, NUGET_TABLE) = read_crosstable(fd)
SPDX = URIRef("https://spdx.org/licenses/")
class NuGetMapping(XmlMapping, BaseIntrinsicMapping):
"""
dedicated class for NuGet (.nuspec) mapping and translation
"""
name = "nuget"
mapping = NUGET_TABLE["NuGet"]
mapping["copyright"] = URIRef("http://schema.org/copyrightNotice")
mapping["language"] = URIRef("http://schema.org/inLanguage")
string_fields = [
"description",
"version",
"name",
"tags",
"license",
"summary",
"copyright",
"language",
]
uri_fields = ["projectUrl", "licenseUrl"]
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].endswith(b".nuspec"):
return [entry["sha1"]]
return []
def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]:
return super()._translate_dict(d.get("package", {}).get("metadata", {}))
def translate_repository(self, graph, root, v):
if isinstance(v, dict) and isinstance(v["@url"], str):
codemeta_key = URIRef(self.mapping["repository.url"])
- graph.add((root, codemeta_key, URIRef(v["@url"])))
+ add_url_if_valid(graph, root, codemeta_key, v["@url"])
def normalize_license(self, v):
if isinstance(v, dict) and v["@type"] == "expression":
license_string = v["#text"]
if not bool(
re.search(r" with |\(|\)| and ", license_string, re.IGNORECASE)
):
return [
SPDX + license_type.strip()
for license_type in re.split(
r" or ", license_string, flags=re.IGNORECASE
)
]
else:
return None
def translate_authors(self, graph: Graph, root, s):
if isinstance(s, str):
authors = []
for author_name in s.split(","):
author_name = author_name.strip()
author = BNode()
graph.add((author, RDF.type, SCHEMA.Person))
graph.add((author, SCHEMA.name, Literal(author_name)))
authors.append(author)
add_list(graph, root, SCHEMA.author, authors)
def translate_releaseNotes(self, graph: Graph, root, s):
if isinstance(s, str):
graph.add((root, SCHEMA.releaseNotes, Literal(s)))
def normalize_tags(self, s):
if isinstance(s, str):
return [Literal(tag) for tag in s.split(" ")]
diff --git a/swh/indexer/metadata_dictionary/utils.py b/swh/indexer/metadata_dictionary/utils.py
index 173b146..8a5fdb9 100644
--- a/swh/indexer/metadata_dictionary/utils.py
+++ b/swh/indexer/metadata_dictionary/utils.py
@@ -1,72 +1,112 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
-from typing import Callable, Iterable, Optional, Sequence, TypeVar
+from typing import Any, Callable, Iterable, Optional, Sequence, TypeVar
+import urllib.parse
from pyld import jsonld
from rdflib import RDF, Graph, URIRef
import rdflib.term
from swh.indexer.codemeta import _document_loader
def prettyprint_graph(graph: Graph, root: URIRef):
s = graph.serialize(format="application/ld+json")
jsonld_graph = json.loads(s)
translated_metadata = jsonld.frame(
jsonld_graph,
{"@id": str(root)},
options={
"documentLoader": _document_loader,
"processingMode": "json-ld-1.1",
},
)
print(json.dumps(translated_metadata, indent=4))
def add_list(
graph: Graph,
subject: rdflib.term.Node,
predicate: rdflib.term.Identifier,
objects: Sequence[rdflib.term.Node],
) -> None:
"""Adds triples to the ``graph`` so that they are equivalent to this
JSON-LD object::
{
"@id": subject,
predicate: {"@list": objects}
}
This is a naive implementation of
https://json-ld.org/spec/latest/json-ld-api/#list-to-rdf-conversion
"""
# JSON-LD's @list is syntactic sugar for a linked list / chain in the RDF graph,
# which is what we are going to construct, starting from the end:
last_link: rdflib.term.Node
last_link = RDF.nil
for item in reversed(objects):
link = rdflib.BNode()
graph.add((link, RDF.first, item))
graph.add((link, RDF.rest, last_link))
last_link = link
graph.add((subject, predicate, last_link))
TValue = TypeVar("TValue")
def add_map(
graph: Graph,
subject: rdflib.term.Node,
predicate: rdflib.term.Identifier,
f: Callable[[Graph, TValue], Optional[rdflib.term.Node]],
values: Iterable[TValue],
) -> None:
"""Helper for :func:`add_list` that takes a mapper function ``f``."""
nodes = [f(graph, value) for value in values]
add_list(graph, subject, predicate, [node for node in nodes if node])
+
+
+def add_url_if_valid(
+ graph: Graph,
+ subject: rdflib.term.Node,
+ predicate: rdflib.term.Identifier,
+ url: Any,
+) -> None:
+ """Adds ``(subject, predicate, url)`` to the graph if ``url`` is well-formed.
+
+ This is meant as a workaround for https://github.com/digitalbazaar/pyld/issues/91
+ to drop URLs that are blatantly invalid early, so PyLD does not crash.
+
+ >>> from pprint import pprint
+ >>> graph = Graph()
+ >>> subject = rdflib.term.URIRef("http://example.org/test-software")
+ >>> predicate = rdflib.term.URIRef("http://schema.org/license")
+ >>> add_url_if_valid(
+ ... graph, subject, predicate, "https//www.apache.org/licenses/LICENSE-2.0.txt"
+ ... )
+ >>> add_url_if_valid(
+ ... graph, subject, predicate, "http:s//www.apache.org/licenses/LICENSE-2.0.txt"
+ ... )
+ >>> add_url_if_valid(
+ ... graph, subject, predicate, "https://www.apache.org/licenses/LICENSE-2.0.txt"
+ ... )
+ >>> add_url_if_valid(
+ ... graph, subject, predicate, 42
+ ... )
+ >>> pprint(set(graph.triples((subject, predicate, None))))
+ {(rdflib.term.URIRef('http://example.org/test-software'),
+ rdflib.term.URIRef('http://schema.org/license'),
+ rdflib.term.URIRef('https://www.apache.org/licenses/LICENSE-2.0.txt'))}
+ """
+ if not isinstance(url, str):
+ return
+ if " " in url or not urllib.parse.urlparse(url).netloc:
+ return
+ graph.add((subject, predicate, rdflib.term.URIRef(url)))
diff --git a/swh/indexer/tests/metadata_dictionary/test_npm.py b/swh/indexer/tests/metadata_dictionary/test_npm.py
index 804ac64..9b52bfd 100644
--- a/swh/indexer/tests/metadata_dictionary/test_npm.py
+++ b/swh/indexer/tests/metadata_dictionary/test_npm.py
@@ -1,438 +1,449 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from hypothesis import HealthCheck, given, settings
import pytest
from swh.indexer.metadata_detector import detect_metadata
from swh.indexer.metadata_dictionary import MAPPINGS
from swh.indexer.storage.model import ContentMetadataRow
from ..test_metadata import TRANSLATOR_TOOL, ContentMetadataTestIndexer
from ..utils import (
BASE_TEST_CONFIG,
MAPPING_DESCRIPTION_CONTENT_SHA1,
json_document_strategy,
)
def test_compute_metadata_none():
"""
testing content empty content is empty
should return None
"""
content = b""
# None if no metadata was found or an error occurred
declared_metadata = None
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_compute_metadata_npm():
"""
testing only computation of metadata with hard_mapping_npm
"""
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
},
"author": {
"email": "moranegg@example.com",
"name": "Morane G"
}
}
"""
declared_metadata = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"codeRepository": "git+https://github.com/moranegg/metadata_test",
"author": [
{
"type": "Person",
"name": "Morane G",
"email": "moranegg@example.com",
}
],
}
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_compute_metadata_invalid_description_npm():
"""
testing only computation of metadata with hard_mapping_npm
"""
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": 1234
}
"""
declared_metadata = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"name": "test_metadata",
"version": "0.0.2",
}
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_index_content_metadata_npm(storage, obj_storage):
"""
testing NPM with package.json
- one sha1 uses a file that can't be translated to metadata and
should return None in the translated metadata
"""
sha1s = [
MAPPING_DESCRIPTION_CONTENT_SHA1["json:test-metadata-package.json"],
MAPPING_DESCRIPTION_CONTENT_SHA1["json:npm-package.json"],
MAPPING_DESCRIPTION_CONTENT_SHA1["python:code"],
]
# this metadata indexer computes only metadata for package.json
# in npm context with a hard mapping
config = BASE_TEST_CONFIG.copy()
config["tools"] = [TRANSLATOR_TOOL]
metadata_indexer = ContentMetadataTestIndexer(config=config)
metadata_indexer.run(sha1s, log_suffix="unknown content")
results = list(metadata_indexer.idx_storage.content_metadata_get(sha1s))
expected_results = [
ContentMetadataRow(
id=sha1s[0],
tool=TRANSLATOR_TOOL,
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"codeRepository": "git+https://github.com/moranegg/metadata_test",
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
),
ContentMetadataRow(
id=sha1s[1],
tool=TRANSLATOR_TOOL,
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"issueTracker": "https://github.com/npm/npm/issues",
"author": [
{
"type": "Person",
"name": "Isaac Z. Schlueter",
"email": "i@izs.me",
"url": "http://blog.izs.me",
}
],
"codeRepository": "git+https://github.com/npm/npm",
"description": "a package manager for JavaScript",
"license": "https://spdx.org/licenses/Artistic-2.0",
"version": "5.0.3",
"name": "npm",
"url": "https://docs.npmjs.com/",
},
),
]
for result in results:
del result.tool["id"]
result.metadata.pop("keywords", None)
# The assertion below returns False sometimes because of nested lists
assert expected_results == results
def test_npm_null_list_item_normalization():
package_json = b"""{
"name": "foo",
"keywords": [
"foo",
null
],
"homepage": [
"http://example.org/",
null
]
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
"url": "http://example.org/",
"keywords": "foo",
}
def test_npm_bugs_normalization():
# valid dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"url": "https://github.com/owner/project/issues",
"email": "foo@example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"issueTracker": "https://github.com/owner/project/issues",
"type": "SoftwareSourceCode",
}
# "invalid" dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"email": "foo@example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
}
# string
package_json = b"""{
"name": "foo",
"bugs": "https://github.com/owner/project/issues"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"issueTracker": "https://github.com/owner/project/issues",
"type": "SoftwareSourceCode",
}
def test_npm_repository_normalization():
# normal
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git",
"url" : "https://github.com/npm/cli.git"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://github.com/npm/cli.git",
"type": "SoftwareSourceCode",
}
# missing url
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
}
# github shortcut
package_json = b"""{
"name": "foo",
"repository": "github:npm/cli"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://github.com/npm/cli.git",
"type": "SoftwareSourceCode",
}
assert result == expected_result
# github shortshortcut
package_json = b"""{
"name": "foo",
"repository": "npm/cli"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == expected_result
# gitlab shortcut
package_json = b"""{
"name": "foo",
"repository": "gitlab:user/repo"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://gitlab.com/user/repo.git",
"type": "SoftwareSourceCode",
}
def test_npm_author():
package_json = rb"""{
"version": "1.0.0",
"author": "Foo Bar (@example)"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "Foo Bar", "type": "Person"}],
"version": "1.0.0",
}
def test_npm_invalid_uris():
package_json = rb"""{
"version": "1.0.0",
"homepage": "",
"author": {
"name": "foo",
"url": "http://example.org"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person", "url": "http://example.org"}],
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "http://example.org",
"author": {
"name": "foo",
"url": ""
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"url": "http://example.org",
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "",
"author": {
"name": "foo",
"url": ""
},
"bugs": ""
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "http:example.org",
"author": {
"name": "foo",
"url": "http:example.com"
},
"bugs": {
"url": "http:example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"version": "1.0.0",
}
+ package_json = rb"""{
+ "version": "1.0.0",
+ "repository": "git+https://g ithub.com/foo/bar.git"
+}"""
+ result = MAPPINGS["NpmMapping"]().translate(package_json)
+ assert result == {
+ "@context": "https://doi.org/10.5063/schema/codemeta-2.0",
+ "type": "SoftwareSourceCode",
+ "version": "1.0.0",
+ }
+
def test_npm_invalid_licenses():
package_json = rb"""{
"version": "1.0.0",
"license": "SEE LICENSE IN LICENSE.md",
"author": {
"name": "foo",
"url": "http://example.org"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person", "url": "http://example.org"}],
"version": "1.0.0",
}
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(json_document_strategy(keys=list(MAPPINGS["NpmMapping"].mapping))) # type: ignore
def test_npm_adversarial(doc):
raw = json.dumps(doc).encode()
MAPPINGS["NpmMapping"]().translate(raw)
@pytest.mark.parametrize(
"filename", [b"package.json", b"Package.json", b"PACKAGE.json", b"PACKAGE.JSON"]
)
def test_detect_metadata_package_json(filename):
df = [
{
"sha1_git": b"abc",
"name": b"index.js",
"target": b"abc",
"length": 897,
"status": "visible",
"type": "file",
"perms": 33188,
"dir_id": b"dir_a",
"sha1": b"bcd",
},
{
"sha1_git": b"aab",
"name": filename,
"target": b"aab",
"length": 712,
"status": "visible",
"type": "file",
"perms": 33188,
"dir_id": b"dir_a",
"sha1": b"cde",
},
]
results = detect_metadata(df)
expected_results = {"NpmMapping": [b"cde"]}
assert expected_results == results