Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312519
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
26 KB
Subscribers
None
View Options
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index 2810673..f6253d7 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,363 +1,371 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+import urllib.parse
import uuid
import xml.parsers.expat
from pyld import jsonld
import rdflib
from typing_extensions import TypedDict
import xmltodict
import yaml
from swh.indexer.codemeta import _document_loader, compact
from swh.indexer.namespaces import RDF, SCHEMA
from swh.indexer.storage.interface import Sha1
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Sha1
name: bytes
type: str
TTranslateCallable = TypeVar(
"TTranslateCallable",
bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None],
)
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
"""Returns a decorator that marks the decorated function as adding
the given terms to the ``translated_metadata`` dict"""
def decorator(f: TTranslateCallable) -> TTranslateCallable:
if not hasattr(f, "produced_terms"):
f.produced_terms = [] # type: ignore
f.produced_terms.extend(uris) # type: ignore
return f
return decorator
class BaseMapping:
"""Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
not to be inherited directly."""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
mapping-specific data and translating with the appropriate mapping
to JSON-LD using the Codemeta and ForgeFed vocabularies.
Args:
raw_content: raw content to translate
Returns:
translated metadata in JSON friendly form needed for the content
if parseable, :const:`None` otherwise.
"""
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
class BaseExtrinsicMapping(BaseMapping):
"""Base class for extrinsic_metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
"""
Returns the list of extrinsic metadata formats which can be translated
by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=True)
class BaseIntrinsicMapping(BaseMapping):
"""Base class for intrinsic-metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
"""
Returns the sha1 hashes of files which can be translated by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
"""Base class for all intrinsic metadata mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
uri_fields: List[str] = []
"""List of fields that are simple URIs, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
# one-to-one mapping from the original key to a CodeMeta term
simple_terms = {
str(term)
for (key, term) in cls.mapping.items()
if key in cls.string_fields + cls.uri_fields
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
# more complex mapping from the original key to JSON-LD
complex_terms = {
str(term)
for meth_name in dir(cls)
if meth_name.startswith("translate_")
for term in getattr(getattr(cls, meth_name), "produced_terms", [])
}
return simple_terms | complex_terms
def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
graph = rdflib.Graph()
# The main object being described (the SoftwareSourceCode) does not necessarily
# may or may not have an id.
# Either way, we temporarily use this URI to identify it. Unfortunately,
# we cannot use a blank node as we need to use it for JSON-LD framing later,
# and blank nodes cannot be used for framing in JSON-LD >= 1.1
root_id = (
"https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
+ str(uuid.uuid4())
)
root = rdflib.URIRef(root_id)
graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode))
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(graph, root, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value,
# and add its results to the triples
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
if v is None:
pass
elif isinstance(v, list):
for item in reversed(v):
graph.add((root, codemeta_key, item))
else:
graph.add((root, codemeta_key, v))
elif k in self.string_fields and isinstance(v, str):
graph.add((root, codemeta_key, rdflib.Literal(v)))
elif k in self.string_fields and isinstance(v, list):
for item in v:
graph.add((root, codemeta_key, rdflib.Literal(item)))
elif k in self.uri_fields and isinstance(v, str):
- graph.add((root, codemeta_key, rdflib.URIRef(v)))
+ # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop
+ # URLs that are blatantly invalid early, so PyLD does not crash.
+ parsed_url = urllib.parse.urlparse(v)
+ if parsed_url.netloc:
+ graph.add((root, codemeta_key, rdflib.URIRef(v)))
elif k in self.uri_fields and isinstance(v, list):
for item in v:
if isinstance(item, str):
- graph.add((root, codemeta_key, rdflib.URIRef(item)))
+ # ditto
+ parsed_url = urllib.parse.urlparse(item)
+ if parsed_url.netloc:
+ graph.add((root, codemeta_key, rdflib.URIRef(item)))
else:
continue
self.extra_translation(graph, root, content_dict)
self.sanitize(graph)
# Convert from rdflib's internal graph representation to JSON
s = graph.serialize(format="application/ld+json")
# Load from JSON to a list of Python objects
jsonld_graph = json.loads(s)
# Use JSON-LD framing to turn the graph into a rooted tree
# frame = {"@type": str(SCHEMA.SoftwareSourceCode)}
translated_metadata = jsonld.frame(
jsonld_graph,
{"@id": root_id},
options={
"documentLoader": _document_loader,
"processingMode": "json-ld-1.1",
},
)
# Remove the temporary id we added at the beginning
if isinstance(translated_metadata["@id"], list):
translated_metadata["@id"].remove(root_id)
else:
del translated_metadata["@id"]
return self.normalize_translation(translated_metadata)
def sanitize(self, graph: rdflib.Graph) -> None:
# Remove triples that make PyLD crash
for (subject, predicate, _) in graph.triples((None, None, rdflib.URIRef(""))):
graph.remove((subject, predicate, rdflib.URIRef("")))
# Should not happen, but we's better check as this may lead to incorrect data
invalid = False
for triple in graph.triples((rdflib.URIRef(""), None, None)):
invalid = True
logging.error("Empty triple subject URI: %r", triple)
if invalid:
raise ValueError("Empty triple subject(s)")
def extra_translation(
self, graph: rdflib.Graph, root: rdflib.term.Node, d: Dict[str, Any]
) -> None:
"""Called at the end of the translation process, and may add arbitrary triples
to ``graph`` based on the input dictionary (passed as ``d``).
"""
pass
class JsonMapping(DictMapping):
"""Base class for all mappings that use JSON data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
class XmlMapping(DictMapping):
"""Base class for all mappings that use XML data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
d = xmltodict.parse(raw_content)
except xml.parsers.expat.ExpatError:
self.log.warning("Error parsing XML from %s", self.log_suffix)
return None
except UnicodeDecodeError:
self.log.warning("Error unidecoding XML from %s", self.log_suffix)
return None
except (LookupError, ValueError):
# unknown encoding or multi-byte encoding
self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
return None
if not isinstance(d, dict):
self.log.warning("Skipping ill-formed XML content: %s", raw_content)
return None
return self._translate_dict(d)
class SafeLoader(yaml.SafeLoader):
yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
class YamlMapping(DictMapping, SingleFileIntrinsicMapping):
"""Base class for all mappings that use Yaml data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
raw_content_string: str = raw_content.decode()
try:
content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
except yaml.scanner.ScannerError:
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
diff --git a/swh/indexer/tests/metadata_dictionary/test_npm.py b/swh/indexer/tests/metadata_dictionary/test_npm.py
index f0abe1e..64f4ed2 100644
--- a/swh/indexer/tests/metadata_dictionary/test_npm.py
+++ b/swh/indexer/tests/metadata_dictionary/test_npm.py
@@ -1,386 +1,402 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from hypothesis import HealthCheck, given, settings
import pytest
from swh.indexer.metadata_detector import detect_metadata
from swh.indexer.metadata_dictionary import MAPPINGS
from swh.indexer.storage.model import ContentMetadataRow
from ..test_metadata import TRANSLATOR_TOOL, ContentMetadataTestIndexer
from ..utils import (
BASE_TEST_CONFIG,
MAPPING_DESCRIPTION_CONTENT_SHA1,
json_document_strategy,
)
def test_compute_metadata_none():
"""
testing content empty content is empty
should return None
"""
content = b""
# None if no metadata was found or an error occurred
declared_metadata = None
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_compute_metadata_npm():
"""
testing only computation of metadata with hard_mapping_npm
"""
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
},
"author": {
"email": "moranegg@example.com",
"name": "Morane G"
}
}
"""
declared_metadata = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"codeRepository": "git+https://github.com/moranegg/metadata_test",
"author": [
{
"type": "Person",
"name": "Morane G",
"email": "moranegg@example.com",
}
],
}
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_compute_metadata_invalid_description_npm():
"""
testing only computation of metadata with hard_mapping_npm
"""
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": 1234
}
"""
declared_metadata = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"name": "test_metadata",
"version": "0.0.2",
}
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_index_content_metadata_npm(storage, obj_storage):
"""
testing NPM with package.json
- one sha1 uses a file that can't be translated to metadata and
should return None in the translated metadata
"""
sha1s = [
MAPPING_DESCRIPTION_CONTENT_SHA1["json:test-metadata-package.json"],
MAPPING_DESCRIPTION_CONTENT_SHA1["json:npm-package.json"],
MAPPING_DESCRIPTION_CONTENT_SHA1["python:code"],
]
# this metadata indexer computes only metadata for package.json
# in npm context with a hard mapping
config = BASE_TEST_CONFIG.copy()
config["tools"] = [TRANSLATOR_TOOL]
metadata_indexer = ContentMetadataTestIndexer(config=config)
metadata_indexer.run(sha1s, log_suffix="unknown content")
results = list(metadata_indexer.idx_storage.content_metadata_get(sha1s))
expected_results = [
ContentMetadataRow(
id=sha1s[0],
tool=TRANSLATOR_TOOL,
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"codeRepository": "git+https://github.com/moranegg/metadata_test",
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
),
ContentMetadataRow(
id=sha1s[1],
tool=TRANSLATOR_TOOL,
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"issueTracker": "https://github.com/npm/npm/issues",
"author": [
{
"type": "Person",
"name": "Isaac Z. Schlueter",
"email": "i@izs.me",
"url": "http://blog.izs.me",
}
],
"codeRepository": "git+https://github.com/npm/npm",
"description": "a package manager for JavaScript",
"license": "https://spdx.org/licenses/Artistic-2.0",
"version": "5.0.3",
"name": "npm",
"url": "https://docs.npmjs.com/",
},
),
]
for result in results:
del result.tool["id"]
result.metadata.pop("keywords", None)
# The assertion below returns False sometimes because of nested lists
assert expected_results == results
def test_npm_null_list_item_normalization():
package_json = b"""{
"name": "foo",
"keywords": [
"foo",
null
],
"homepage": [
"http://example.org/",
null
]
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
"url": "http://example.org/",
"keywords": "foo",
}
def test_npm_bugs_normalization():
# valid dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"url": "https://github.com/owner/project/issues",
"email": "foo@example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"issueTracker": "https://github.com/owner/project/issues",
"type": "SoftwareSourceCode",
}
# "invalid" dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"email": "foo@example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
}
# string
package_json = b"""{
"name": "foo",
"bugs": "https://github.com/owner/project/issues"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"issueTracker": "https://github.com/owner/project/issues",
"type": "SoftwareSourceCode",
}
def test_npm_repository_normalization():
# normal
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git",
"url" : "https://github.com/npm/cli.git"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://github.com/npm/cli.git",
"type": "SoftwareSourceCode",
}
# missing url
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
}
# github shortcut
package_json = b"""{
"name": "foo",
"repository": "github:npm/cli"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://github.com/npm/cli.git",
"type": "SoftwareSourceCode",
}
assert result == expected_result
# github shortshortcut
package_json = b"""{
"name": "foo",
"repository": "npm/cli"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == expected_result
# gitlab shortcut
package_json = b"""{
"name": "foo",
"repository": "gitlab:user/repo"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://gitlab.com/user/repo.git",
"type": "SoftwareSourceCode",
}
-def test_npm_empty_uris():
+def test_npm_invalid_uris():
package_json = rb"""{
"version": "1.0.0",
"homepage": "",
"author": {
"name": "foo",
"url": "http://example.org"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person", "url": "http://example.org"}],
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "http://example.org",
"author": {
"name": "foo",
"url": ""
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"url": "http://example.org",
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "",
"author": {
"name": "foo",
"url": ""
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"version": "1.0.0",
}
+ package_json = rb"""{
+ "version": "1.0.0",
+ "homepage": "http:example.org",
+ "author": {
+ "name": "foo",
+ "url": "http:example.com"
+ }
+}"""
+ result = MAPPINGS["NpmMapping"]().translate(package_json)
+ assert result == {
+ "@context": "https://doi.org/10.5063/schema/codemeta-2.0",
+ "type": "SoftwareSourceCode",
+ "author": [{"name": "foo", "type": "Person"}],
+ "version": "1.0.0",
+ }
+
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(json_document_strategy(keys=list(MAPPINGS["NpmMapping"].mapping))) # type: ignore
def test_npm_adversarial(doc):
raw = json.dumps(doc).encode()
MAPPINGS["NpmMapping"]().translate(raw)
@pytest.mark.parametrize(
"filename", [b"package.json", b"Package.json", b"PACKAGE.json", b"PACKAGE.JSON"]
)
def test_detect_metadata_package_json(filename):
df = [
{
"sha1_git": b"abc",
"name": b"index.js",
"target": b"abc",
"length": 897,
"status": "visible",
"type": "file",
"perms": 33188,
"dir_id": b"dir_a",
"sha1": b"bcd",
},
{
"sha1_git": b"aab",
"name": filename,
"target": b"aab",
"length": 712,
"status": "visible",
"type": "file",
"perms": 33188,
"dir_id": b"dir_a",
"sha1": b"cde",
},
]
results = detect_metadata(df)
expected_results = {"NpmMapping": [b"cde"]}
assert expected_results == results
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:56 AM (1 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3236401
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment