Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index 2810673..f6253d7 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,363 +1,371 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+import urllib.parse
import uuid
import xml.parsers.expat
from pyld import jsonld
import rdflib
from typing_extensions import TypedDict
import xmltodict
import yaml
from swh.indexer.codemeta import _document_loader, compact
from swh.indexer.namespaces import RDF, SCHEMA
from swh.indexer.storage.interface import Sha1
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Sha1
name: bytes
type: str
TTranslateCallable = TypeVar(
"TTranslateCallable",
bound=Callable[[Any, rdflib.Graph, rdflib.term.BNode, Any], None],
)
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
"""Returns a decorator that marks the decorated function as adding
the given terms to the ``translated_metadata`` dict"""
def decorator(f: TTranslateCallable) -> TTranslateCallable:
if not hasattr(f, "produced_terms"):
f.produced_terms = [] # type: ignore
f.produced_terms.extend(uris) # type: ignore
return f
return decorator
class BaseMapping:
"""Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
not to be inherited directly."""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
mapping-specific data and translating with the appropriate mapping
to JSON-LD using the Codemeta and ForgeFed vocabularies.
Args:
raw_content: raw content to translate
Returns:
translated metadata in JSON friendly form needed for the content
if parseable, :const:`None` otherwise.
"""
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
class BaseExtrinsicMapping(BaseMapping):
"""Base class for extrinsic_metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
"""
Returns the list of extrinsic metadata formats which can be translated
by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=True)
class BaseIntrinsicMapping(BaseMapping):
"""Base class for intrinsic-metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
"""
Returns the sha1 hashes of files which can be translated by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
"""Base class for all intrinsic metadata mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
uri_fields: List[str] = []
"""List of fields that are simple URIs, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
# one-to-one mapping from the original key to a CodeMeta term
simple_terms = {
str(term)
for (key, term) in cls.mapping.items()
if key in cls.string_fields + cls.uri_fields
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
# more complex mapping from the original key to JSON-LD
complex_terms = {
str(term)
for meth_name in dir(cls)
if meth_name.startswith("translate_")
for term in getattr(getattr(cls, meth_name), "produced_terms", [])
}
return simple_terms | complex_terms
def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
graph = rdflib.Graph()
# The main object being described (the SoftwareSourceCode) does not necessarily
# may or may not have an id.
# Either way, we temporarily use this URI to identify it. Unfortunately,
# we cannot use a blank node as we need to use it for JSON-LD framing later,
# and blank nodes cannot be used for framing in JSON-LD >= 1.1
root_id = (
"https://www.softwareheritage.org/schema/2022/indexer/tmp-node/"
+ str(uuid.uuid4())
)
root = rdflib.URIRef(root_id)
graph.add((root, RDF.type, SCHEMA.SoftwareSourceCode))
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(graph, root, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value,
# and add its results to the triples
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
if v is None:
pass
elif isinstance(v, list):
for item in reversed(v):
graph.add((root, codemeta_key, item))
else:
graph.add((root, codemeta_key, v))
elif k in self.string_fields and isinstance(v, str):
graph.add((root, codemeta_key, rdflib.Literal(v)))
elif k in self.string_fields and isinstance(v, list):
for item in v:
graph.add((root, codemeta_key, rdflib.Literal(item)))
elif k in self.uri_fields and isinstance(v, str):
- graph.add((root, codemeta_key, rdflib.URIRef(v)))
+ # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop
+ # URLs that are blatantly invalid early, so PyLD does not crash.
+ parsed_url = urllib.parse.urlparse(v)
+ if parsed_url.netloc:
+ graph.add((root, codemeta_key, rdflib.URIRef(v)))
elif k in self.uri_fields and isinstance(v, list):
for item in v:
if isinstance(item, str):
- graph.add((root, codemeta_key, rdflib.URIRef(item)))
+ # ditto
+ parsed_url = urllib.parse.urlparse(item)
+ if parsed_url.netloc:
+ graph.add((root, codemeta_key, rdflib.URIRef(item)))
else:
continue
self.extra_translation(graph, root, content_dict)
self.sanitize(graph)
# Convert from rdflib's internal graph representation to JSON
s = graph.serialize(format="application/ld+json")
# Load from JSON to a list of Python objects
jsonld_graph = json.loads(s)
# Use JSON-LD framing to turn the graph into a rooted tree
# frame = {"@type": str(SCHEMA.SoftwareSourceCode)}
translated_metadata = jsonld.frame(
jsonld_graph,
{"@id": root_id},
options={
"documentLoader": _document_loader,
"processingMode": "json-ld-1.1",
},
)
# Remove the temporary id we added at the beginning
if isinstance(translated_metadata["@id"], list):
translated_metadata["@id"].remove(root_id)
else:
del translated_metadata["@id"]
return self.normalize_translation(translated_metadata)
def sanitize(self, graph: rdflib.Graph) -> None:
# Remove triples that make PyLD crash
for (subject, predicate, _) in graph.triples((None, None, rdflib.URIRef(""))):
graph.remove((subject, predicate, rdflib.URIRef("")))
# Should not happen, but we's better check as this may lead to incorrect data
invalid = False
for triple in graph.triples((rdflib.URIRef(""), None, None)):
invalid = True
logging.error("Empty triple subject URI: %r", triple)
if invalid:
raise ValueError("Empty triple subject(s)")
def extra_translation(
self, graph: rdflib.Graph, root: rdflib.term.Node, d: Dict[str, Any]
) -> None:
"""Called at the end of the translation process, and may add arbitrary triples
to ``graph`` based on the input dictionary (passed as ``d``).
"""
pass
class JsonMapping(DictMapping):
"""Base class for all mappings that use JSON data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
class XmlMapping(DictMapping):
"""Base class for all mappings that use XML data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
try:
d = xmltodict.parse(raw_content)
except xml.parsers.expat.ExpatError:
self.log.warning("Error parsing XML from %s", self.log_suffix)
return None
except UnicodeDecodeError:
self.log.warning("Error unidecoding XML from %s", self.log_suffix)
return None
except (LookupError, ValueError):
# unknown encoding or multi-byte encoding
self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
return None
if not isinstance(d, dict):
self.log.warning("Skipping ill-formed XML content: %s", raw_content)
return None
return self._translate_dict(d)
class SafeLoader(yaml.SafeLoader):
yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
class YamlMapping(DictMapping, SingleFileIntrinsicMapping):
"""Base class for all mappings that use Yaml data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
raw_content_string: str = raw_content.decode()
try:
content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
except yaml.scanner.ScannerError:
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
diff --git a/swh/indexer/tests/metadata_dictionary/test_npm.py b/swh/indexer/tests/metadata_dictionary/test_npm.py
index f0abe1e..64f4ed2 100644
--- a/swh/indexer/tests/metadata_dictionary/test_npm.py
+++ b/swh/indexer/tests/metadata_dictionary/test_npm.py
@@ -1,386 +1,402 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from hypothesis import HealthCheck, given, settings
import pytest
from swh.indexer.metadata_detector import detect_metadata
from swh.indexer.metadata_dictionary import MAPPINGS
from swh.indexer.storage.model import ContentMetadataRow
from ..test_metadata import TRANSLATOR_TOOL, ContentMetadataTestIndexer
from ..utils import (
BASE_TEST_CONFIG,
MAPPING_DESCRIPTION_CONTENT_SHA1,
json_document_strategy,
)
def test_compute_metadata_none():
"""
testing content empty content is empty
should return None
"""
content = b""
# None if no metadata was found or an error occurred
declared_metadata = None
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_compute_metadata_npm():
"""
testing only computation of metadata with hard_mapping_npm
"""
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"repository": {
"type": "git",
"url": "https://github.com/moranegg/metadata_test"
},
"author": {
"email": "moranegg@example.com",
"name": "Morane G"
}
}
"""
declared_metadata = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"name": "test_metadata",
"version": "0.0.2",
"description": "Simple package.json test for indexer",
"codeRepository": "git+https://github.com/moranegg/metadata_test",
"author": [
{
"type": "Person",
"name": "Morane G",
"email": "moranegg@example.com",
}
],
}
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_compute_metadata_invalid_description_npm():
"""
testing only computation of metadata with hard_mapping_npm
"""
content = b"""
{
"name": "test_metadata",
"version": "0.0.2",
"description": 1234
}
"""
declared_metadata = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"name": "test_metadata",
"version": "0.0.2",
}
result = MAPPINGS["NpmMapping"]().translate(content)
assert declared_metadata == result
def test_index_content_metadata_npm(storage, obj_storage):
"""
testing NPM with package.json
- one sha1 uses a file that can't be translated to metadata and
should return None in the translated metadata
"""
sha1s = [
MAPPING_DESCRIPTION_CONTENT_SHA1["json:test-metadata-package.json"],
MAPPING_DESCRIPTION_CONTENT_SHA1["json:npm-package.json"],
MAPPING_DESCRIPTION_CONTENT_SHA1["python:code"],
]
# this metadata indexer computes only metadata for package.json
# in npm context with a hard mapping
config = BASE_TEST_CONFIG.copy()
config["tools"] = [TRANSLATOR_TOOL]
metadata_indexer = ContentMetadataTestIndexer(config=config)
metadata_indexer.run(sha1s, log_suffix="unknown content")
results = list(metadata_indexer.idx_storage.content_metadata_get(sha1s))
expected_results = [
ContentMetadataRow(
id=sha1s[0],
tool=TRANSLATOR_TOOL,
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"codeRepository": "git+https://github.com/moranegg/metadata_test",
"description": "Simple package.json test for indexer",
"name": "test_metadata",
"version": "0.0.1",
},
),
ContentMetadataRow(
id=sha1s[1],
tool=TRANSLATOR_TOOL,
metadata={
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"issueTracker": "https://github.com/npm/npm/issues",
"author": [
{
"type": "Person",
"name": "Isaac Z. Schlueter",
"email": "i@izs.me",
"url": "http://blog.izs.me",
}
],
"codeRepository": "git+https://github.com/npm/npm",
"description": "a package manager for JavaScript",
"license": "https://spdx.org/licenses/Artistic-2.0",
"version": "5.0.3",
"name": "npm",
"url": "https://docs.npmjs.com/",
},
),
]
for result in results:
del result.tool["id"]
result.metadata.pop("keywords", None)
# The assertion below returns False sometimes because of nested lists
assert expected_results == results
def test_npm_null_list_item_normalization():
package_json = b"""{
"name": "foo",
"keywords": [
"foo",
null
],
"homepage": [
"http://example.org/",
null
]
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
"url": "http://example.org/",
"keywords": "foo",
}
def test_npm_bugs_normalization():
# valid dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"url": "https://github.com/owner/project/issues",
"email": "foo@example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"issueTracker": "https://github.com/owner/project/issues",
"type": "SoftwareSourceCode",
}
# "invalid" dictionary
package_json = b"""{
"name": "foo",
"bugs": {
"email": "foo@example.com"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
}
# string
package_json = b"""{
"name": "foo",
"bugs": "https://github.com/owner/project/issues"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"issueTracker": "https://github.com/owner/project/issues",
"type": "SoftwareSourceCode",
}
def test_npm_repository_normalization():
# normal
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git",
"url" : "https://github.com/npm/cli.git"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://github.com/npm/cli.git",
"type": "SoftwareSourceCode",
}
# missing url
package_json = b"""{
"name": "foo",
"repository": {
"type" : "git"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"type": "SoftwareSourceCode",
}
# github shortcut
package_json = b"""{
"name": "foo",
"repository": "github:npm/cli"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
expected_result = {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://github.com/npm/cli.git",
"type": "SoftwareSourceCode",
}
assert result == expected_result
# github shortshortcut
package_json = b"""{
"name": "foo",
"repository": "npm/cli"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == expected_result
# gitlab shortcut
package_json = b"""{
"name": "foo",
"repository": "gitlab:user/repo"
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"name": "foo",
"codeRepository": "git+https://gitlab.com/user/repo.git",
"type": "SoftwareSourceCode",
}
-def test_npm_empty_uris():
+def test_npm_invalid_uris():
package_json = rb"""{
"version": "1.0.0",
"homepage": "",
"author": {
"name": "foo",
"url": "http://example.org"
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person", "url": "http://example.org"}],
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "http://example.org",
"author": {
"name": "foo",
"url": ""
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"url": "http://example.org",
"version": "1.0.0",
}
package_json = rb"""{
"version": "1.0.0",
"homepage": "",
"author": {
"name": "foo",
"url": ""
}
}"""
result = MAPPINGS["NpmMapping"]().translate(package_json)
assert result == {
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"type": "SoftwareSourceCode",
"author": [{"name": "foo", "type": "Person"}],
"version": "1.0.0",
}
+ package_json = rb"""{
+ "version": "1.0.0",
+ "homepage": "http:example.org",
+ "author": {
+ "name": "foo",
+ "url": "http:example.com"
+ }
+}"""
+ result = MAPPINGS["NpmMapping"]().translate(package_json)
+ assert result == {
+ "@context": "https://doi.org/10.5063/schema/codemeta-2.0",
+ "type": "SoftwareSourceCode",
+ "author": [{"name": "foo", "type": "Person"}],
+ "version": "1.0.0",
+ }
+
@settings(suppress_health_check=[HealthCheck.too_slow])
@given(json_document_strategy(keys=list(MAPPINGS["NpmMapping"].mapping))) # type: ignore
def test_npm_adversarial(doc):
raw = json.dumps(doc).encode()
MAPPINGS["NpmMapping"]().translate(raw)
@pytest.mark.parametrize(
"filename", [b"package.json", b"Package.json", b"PACKAGE.json", b"PACKAGE.JSON"]
)
def test_detect_metadata_package_json(filename):
df = [
{
"sha1_git": b"abc",
"name": b"index.js",
"target": b"abc",
"length": 897,
"status": "visible",
"type": "file",
"perms": 33188,
"dir_id": b"dir_a",
"sha1": b"bcd",
},
{
"sha1_git": b"aab",
"name": filename,
"target": b"aab",
"length": 712,
"status": "visible",
"type": "file",
"perms": 33188,
"dir_id": b"dir_a",
"sha1": b"cde",
},
]
results = detect_metadata(df)
expected_results = {"NpmMapping": [b"cde"]}
assert expected_results == results

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:56 AM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3236401

Event Timeline