diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index be2eae1..5208745 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,272 +1,308 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
+import xml.parsers.expat
from typing_extensions import TypedDict
+import xmltodict
import yaml
from swh.indexer.codemeta import compact, merge_values
from swh.indexer.namespaces import SCHEMA
from swh.indexer.storage.interface import Sha1
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Sha1
name: bytes
type: str
TTranslateCallable = TypeVar(
"TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None]
)
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
"""Returns a decorator that marks the decorated function as adding
the given terms to the ``translated_metadata`` dict"""
def decorator(f: TTranslateCallable) -> TTranslateCallable:
if not hasattr(f, "produced_terms"):
f.produced_terms = [] # type: ignore
f.produced_terms.extend(uris) # type: ignore
return f
return decorator
class BaseMapping:
"""Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
not to be inherited directly."""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
def translate(self, file_content: bytes) -> Optional[Dict]:
"""Translates metadata, from the content of a file or of a RawExtrinsicMetadata
object."""
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
class BaseExtrinsicMapping(BaseMapping):
"""Base class for extrinsic_metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
"""
Returns the list of extrinsic metadata formats which can be translated
by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=True)
class BaseIntrinsicMapping(BaseMapping):
"""Base class for intrinsic-metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
"""
Returns the sha1 hashes of files which can be translated by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
"""Base class for all intrinsic metadata mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields = [] # type: List[str]
"""List of fields that are simple strings, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
# one-to-one mapping from the original key to a CodeMeta term
simple_terms = {
term
for (key, term) in cls.mapping.items()
if key in cls.string_fields
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
# more complex mapping from the original key to JSON-LD
complex_terms = {
term
for meth_name in dir(cls)
if meth_name.startswith("translate_")
for term in getattr(getattr(cls, meth_name), "produced_terms", [])
}
return simple_terms | complex_terms
- def _translate_dict(self, content_dict: Dict) -> Dict[str, str]:
+ def _translate_dict(self, content_dict: Dict) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
translated_metadata = {"@type": SCHEMA.SoftwareSourceCode}
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(translated_metadata, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
elif k in self.string_fields and isinstance(v, str):
pass
elif k in self.string_fields and isinstance(v, list):
v = [x for x in v if isinstance(x, str)]
else:
continue
# set the translation metadata with the normalized value
if codemeta_key in translated_metadata:
translated_metadata[codemeta_key] = merge_values(
translated_metadata[codemeta_key], v
)
else:
translated_metadata[codemeta_key] = v
self.extra_translation(translated_metadata, content_dict)
return self.normalize_translation(translated_metadata)
def extra_translation(self, translated_metadata: Dict[str, Any], d: Dict[str, Any]):
"""Called at the end of the translation process, and may add arbitrary keys
to ``translated_metadata`` based on the input dictionary (passed as ``d``).
"""
pass
class JsonMapping(DictMapping):
"""Base class for all mappings that use JSON data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
raw_content (bytes): raw content to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
+class XmlMapping(DictMapping):
+ """Base class for all mappings that use XML data as input."""
+
+ def translate(self, raw_content: bytes) -> Optional[Dict]:
+ """
+ Translates content by parsing content from a bytestring containing
+ XML data and translating with the appropriate mapping
+
+ Args:
+ raw_content (bytes): raw content to translate
+
+ Returns:
+ dict: translated metadata in json-friendly form needed for
+ the indexer
+
+ """
+ try:
+ d = xmltodict.parse(raw_content)
+ except xml.parsers.expat.ExpatError:
+ self.log.warning("Error parsing XML from %s", self.log_suffix)
+ return None
+ except UnicodeDecodeError:
+ self.log.warning("Error unidecoding XML from %s", self.log_suffix)
+ return None
+ except (LookupError, ValueError):
+ # unknown encoding or multi-byte encoding
+ self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
+ return None
+ if not isinstance(d, dict):
+ self.log.warning("Skipping ill-formed XML content: %s", raw_content)
+ return None
+ return self._translate_dict(d)
+
+
class SafeLoader(yaml.SafeLoader):
yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
class YamlMapping(DictMapping, SingleFileIntrinsicMapping):
"""Base class for all mappings that use Yaml data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
raw_content_string: str = raw_content.decode()
try:
content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
except yaml.scanner.ScannerError:
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
index 40c9de4..179538b 100644
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -1,169 +1,151 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
-from typing import Any, Dict, Optional
-import xml.parsers.expat
-
-import xmltodict
+from typing import Any, Dict
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import SCHEMA
-from .base import DictMapping, SingleFileIntrinsicMapping
+from .base import SingleFileIntrinsicMapping, XmlMapping
-class MavenMapping(DictMapping, SingleFileIntrinsicMapping):
+class MavenMapping(XmlMapping, SingleFileIntrinsicMapping):
"""
dedicated class for Maven (pom.xml) mapping and translation
"""
name = "maven"
filename = b"pom.xml"
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
- def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
- try:
- d = xmltodict.parse(content).get("project") or {}
- except xml.parsers.expat.ExpatError:
- self.log.warning("Error parsing XML from %s", self.log_suffix)
- return None
- except UnicodeDecodeError:
- self.log.warning("Error unidecoding XML from %s", self.log_suffix)
- return None
- except (LookupError, ValueError):
- # unknown encoding or multi-byte encoding
- self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
- return None
- if not isinstance(d, dict):
- self.log.warning("Skipping ill-formed XML content: %s", content)
- return None
- return self._translate_dict(d)
-
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
+ def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]:
+ return super()._translate_dict(d.get("project") or {})
+
def extra_translation(self, translated_metadata, d):
repositories = self.parse_repositories(d)
if repositories:
translated_metadata[SCHEMA.codeRepository] = repositories
def parse_repositories(self, d):
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... codehausSnapshots
... Codehaus Snapshots
... http://snapshots.maven.codehaus.org/maven2
... default
...
...
... ''')
>>> MavenMapping().parse_repositories(d)
"""
repositories = d.get("repositories")
if not repositories:
results = [self.parse_repository(d, self._default_repository)]
elif isinstance(repositories, dict):
repositories = repositories.get("repository") or []
if not isinstance(repositories, list):
repositories = [repositories]
results = [self.parse_repository(d, repo) for repo in repositories]
else:
results = []
return [res for res in results if res] or None
def parse_repository(self, d, repo):
if not isinstance(repo, dict):
return
if repo.get("layout", "default") != "default":
return # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
if (
isinstance(url, str)
and isinstance(group_id, str)
and isinstance(artifact_id, str)
):
repo = os.path.join(url, *group_id.split("."), artifact_id)
return {"@id": repo}
def normalize_groupId(self, id_):
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
{'@id': 'org.example'}
"""
if isinstance(id_, str):
return {"@id": id_}
def translate_licenses(self, translated_metadata, d):
licenses = self.parse_licenses(d)
if licenses:
translated_metadata[SCHEMA.license] = licenses
def parse_licenses(self, licenses):
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
>>> import json
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... ''')
>>> print(json.dumps(d, indent=4))
{
"licenses": {
"license": {
"name": "Apache License, Version 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
}
>>> MavenMapping().parse_licenses(d["licenses"])
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}]
or, if there are more than one license:
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... MIT License
... https://opensource.org/licenses/MIT
...
...
... ''')
>>> pprint(MavenMapping().parse_licenses(d["licenses"]))
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'},
{'@id': 'https://opensource.org/licenses/MIT'}]
"""
if not isinstance(licenses, dict):
return
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
return
return [
{"@id": license["url"]}
for license in licenses
if isinstance(license, dict) and isinstance(license.get("url"), str)
] or None
diff --git a/swh/indexer/metadata_dictionary/nuget.py b/swh/indexer/metadata_dictionary/nuget.py
index 05b95d4..470a972 100644
--- a/swh/indexer/metadata_dictionary/nuget.py
+++ b/swh/indexer/metadata_dictionary/nuget.py
@@ -1,105 +1,98 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os.path
import re
-from typing import Any, Dict, List, Optional
-
-import xmltodict
+from typing import Any, Dict, List
from swh.indexer.codemeta import _DATA_DIR, _read_crosstable
from swh.indexer.namespaces import SCHEMA
from swh.indexer.storage.interface import Sha1
-from .base import BaseIntrinsicMapping, DictMapping, DirectoryLsEntry
+from .base import BaseIntrinsicMapping, DirectoryLsEntry, XmlMapping
NUGET_TABLE_PATH = os.path.join(_DATA_DIR, "nuget.csv")
with open(NUGET_TABLE_PATH) as fd:
(CODEMETA_TERMS, NUGET_TABLE) = _read_crosstable(fd)
-class NuGetMapping(DictMapping, BaseIntrinsicMapping):
+class NuGetMapping(XmlMapping, BaseIntrinsicMapping):
"""
dedicated class for NuGet (.nuspec) mapping and translation
"""
name = "nuget"
mapping = NUGET_TABLE["NuGet"]
mapping["copyright"] = "http://schema.org/copyrightNotice"
mapping["language"] = "http://schema.org/inLanguage"
string_fields = [
"description",
"version",
"projectUrl",
"name",
"tags",
"license",
"licenseUrl",
"summary",
"copyright",
"language",
]
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].endswith(b".nuspec"):
return [entry["sha1"]]
return []
- def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
- d = xmltodict.parse(content).get("package", {}).get("metadata", {})
- if not isinstance(d, dict):
- self.log.warning("Skipping ill-formed XML content: %s", content)
- return None
-
- return self._translate_dict(d)
+ def _translate_dict(self, d: Dict[str, Any]) -> Dict[str, Any]:
+ return super()._translate_dict(d.get("package", {}).get("metadata", {}))
def normalize_projectUrl(self, s):
if isinstance(s, str):
return {"@id": s}
def translate_repository(self, translated_metadata, v):
if isinstance(v, dict) and isinstance(v["@url"], str):
codemeta_key = self.mapping["repository.url"]
translated_metadata[codemeta_key] = {"@id": v["@url"]}
def normalize_license(self, v):
if isinstance(v, dict) and v["@type"] == "expression":
license_string = v["#text"]
if not bool(
re.search(r" with |\(|\)| and ", license_string, re.IGNORECASE)
):
return [
{"@id": "https://spdx.org/licenses/" + license_type.strip()}
for license_type in re.split(
r" or ", license_string, flags=re.IGNORECASE
)
]
else:
return None
def normalize_licenseUrl(self, s):
if isinstance(s, str):
return {"@id": s}
def normalize_authors(self, s):
if isinstance(s, str):
author_names = [a.strip() for a in s.split(",")]
authors = [
{"@type": SCHEMA.Person, SCHEMA.name: name} for name in author_names
]
return {"@list": authors}
def translate_releaseNotes(self, translated_metadata, s):
if isinstance(s, str):
translated_metadata.setdefault("http://schema.org/releaseNotes", []).append(
s
)
def normalize_tags(self, s):
if isinstance(s, str):
return s.split(" ")