diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index 0cabaf4..be2eae1 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,269 +1,272 @@
# Copyright (C) 2017-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar
from typing_extensions import TypedDict
import yaml
from swh.indexer.codemeta import compact, merge_values
from swh.indexer.namespaces import SCHEMA
from swh.indexer.storage.interface import Sha1
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Sha1
name: bytes
type: str
TTranslateCallable = TypeVar(
"TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None]
)
def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]:
"""Returns a decorator that marks the decorated function as adding
the given terms to the ``translated_metadata`` dict"""
def decorator(f: TTranslateCallable) -> TTranslateCallable:
if not hasattr(f, "produced_terms"):
f.produced_terms = [] # type: ignore
f.produced_terms.extend(uris) # type: ignore
return f
return decorator
class BaseMapping:
"""Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`,
not to be inherited directly."""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
def translate(self, file_content: bytes) -> Optional[Dict]:
"""Translates metadata, from the content of a file or of a RawExtrinsicMetadata
object."""
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation")
class BaseExtrinsicMapping(BaseMapping):
"""Base class for extrinsic_metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def extrinsic_metadata_formats(cls) -> Tuple[str, ...]:
"""
Returns the list of extrinsic metadata formats which can be translated
by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=True)
class BaseIntrinsicMapping(BaseMapping):
"""Base class for intrinsic-metadata mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
"""
Returns the sha1 hashes of files which can be translated by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata, forgefed=False)
class SingleFileIntrinsicMapping(BaseIntrinsicMapping):
"""Base class for all intrinsic metadata mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields = [] # type: List[str]
"""List of fields that are simple strings, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
# one-to-one mapping from the original key to a CodeMeta term
simple_terms = {
term
for (key, term) in cls.mapping.items()
if key in cls.string_fields
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
# more complex mapping from the original key to JSON-LD
complex_terms = {
term
for meth_name in dir(cls)
if meth_name.startswith("translate_")
for term in getattr(getattr(cls, meth_name), "produced_terms", [])
}
return simple_terms | complex_terms
- def _translate_dict(
- self, content_dict: Dict, *, normalize: bool = True
- ) -> Dict[str, str]:
+ def _translate_dict(self, content_dict: Dict) -> Dict[str, str]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
translated_metadata = {"@type": SCHEMA.SoftwareSourceCode}
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(translated_metadata, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
elif k in self.string_fields and isinstance(v, str):
pass
elif k in self.string_fields and isinstance(v, list):
v = [x for x in v if isinstance(x, str)]
else:
continue
# set the translation metadata with the normalized value
if codemeta_key in translated_metadata:
translated_metadata[codemeta_key] = merge_values(
translated_metadata[codemeta_key], v
)
else:
translated_metadata[codemeta_key] = v
- if normalize:
- return self.normalize_translation(translated_metadata)
- else:
- return translated_metadata
+ self.extra_translation(translated_metadata, content_dict)
+
+ return self.normalize_translation(translated_metadata)
+
+ def extra_translation(self, translated_metadata: Dict[str, Any], d: Dict[str, Any]):
+ """Called at the end of the translation process, and may add arbitrary keys
+ to ``translated_metadata`` based on the input dictionary (passed as ``d``).
+ """
+ pass
class JsonMapping(DictMapping):
"""Base class for all mappings that use JSON data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
raw_content (bytes): raw content to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
class SafeLoader(yaml.SafeLoader):
yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
class YamlMapping(DictMapping, SingleFileIntrinsicMapping):
"""Base class for all mappings that use Yaml data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
raw_content_string: str = raw_content.decode()
try:
content_dict = yaml.load(raw_content_string, Loader=SafeLoader)
except yaml.scanner.ScannerError:
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
index b9df888..40c9de4 100644
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -1,163 +1,169 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from typing import Any, Dict, Optional
import xml.parsers.expat
import xmltodict
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import SCHEMA
from .base import DictMapping, SingleFileIntrinsicMapping
class MavenMapping(DictMapping, SingleFileIntrinsicMapping):
"""
dedicated class for Maven (pom.xml) mapping and translation
"""
name = "maven"
filename = b"pom.xml"
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
d = xmltodict.parse(content).get("project") or {}
except xml.parsers.expat.ExpatError:
self.log.warning("Error parsing XML from %s", self.log_suffix)
return None
except UnicodeDecodeError:
self.log.warning("Error unidecoding XML from %s", self.log_suffix)
return None
except (LookupError, ValueError):
# unknown encoding or multi-byte encoding
self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
return None
if not isinstance(d, dict):
self.log.warning("Skipping ill-formed XML content: %s", content)
return None
- metadata = self._translate_dict(d, normalize=False)
- metadata[SCHEMA.codeRepository] = self.parse_repositories(d)
- metadata[SCHEMA.license] = self.parse_licenses(d)
- return self.normalize_translation(metadata)
+ return self._translate_dict(d)
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
+ def extra_translation(self, translated_metadata, d):
+ repositories = self.parse_repositories(d)
+ if repositories:
+ translated_metadata[SCHEMA.codeRepository] = repositories
+
def parse_repositories(self, d):
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... codehausSnapshots
... Codehaus Snapshots
... http://snapshots.maven.codehaus.org/maven2
... default
...
...
... ''')
>>> MavenMapping().parse_repositories(d)
"""
repositories = d.get("repositories")
if not repositories:
results = [self.parse_repository(d, self._default_repository)]
elif isinstance(repositories, dict):
repositories = repositories.get("repository") or []
if not isinstance(repositories, list):
repositories = [repositories]
results = [self.parse_repository(d, repo) for repo in repositories]
else:
results = []
return [res for res in results if res] or None
def parse_repository(self, d, repo):
if not isinstance(repo, dict):
return
if repo.get("layout", "default") != "default":
return # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
if (
isinstance(url, str)
and isinstance(group_id, str)
and isinstance(artifact_id, str)
):
repo = os.path.join(url, *group_id.split("."), artifact_id)
return {"@id": repo}
def normalize_groupId(self, id_):
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
{'@id': 'org.example'}
"""
if isinstance(id_, str):
return {"@id": id_}
- def parse_licenses(self, d):
+ def translate_licenses(self, translated_metadata, d):
+ licenses = self.parse_licenses(d)
+ if licenses:
+ translated_metadata[SCHEMA.license] = licenses
+
+ def parse_licenses(self, licenses):
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
>>> import json
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... ''')
>>> print(json.dumps(d, indent=4))
{
"licenses": {
"license": {
"name": "Apache License, Version 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
}
- >>> MavenMapping().parse_licenses(d)
+ >>> MavenMapping().parse_licenses(d["licenses"])
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}]
or, if there are more than one license:
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... MIT License
... https://opensource.org/licenses/MIT
...
...
... ''')
- >>> pprint(MavenMapping().parse_licenses(d))
+ >>> pprint(MavenMapping().parse_licenses(d["licenses"]))
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'},
{'@id': 'https://opensource.org/licenses/MIT'}]
"""
- licenses = d.get("licenses")
if not isinstance(licenses, dict):
return
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
return
return [
{"@id": license["url"]}
for license in licenses
if isinstance(license, dict) and isinstance(license.get("url"), str)
] or None
diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py
index bd563ca..1eb27ae 100644
--- a/swh/indexer/metadata_dictionary/python.py
+++ b/swh/indexer/metadata_dictionary/python.py
@@ -1,78 +1,78 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import email.parser
import email.policy
import itertools
from swh.indexer.codemeta import CROSSWALK_TABLE
from swh.indexer.namespaces import SCHEMA
from .base import DictMapping, SingleFileIntrinsicMapping
_normalize_pkginfo_key = str.lower
class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy):
def header_fetch_parse(self, name, value):
if hasattr(value, "name"):
return value
value = value.replace("\n ", "\n")
return self.header_factory(name, value)
class PythonPkginfoMapping(DictMapping, SingleFileIntrinsicMapping):
"""Dedicated class for Python's PKG-INFO mapping and translation.
https://www.python.org/dev/peps/pep-0314/"""
name = "pkg-info"
filename = b"PKG-INFO"
mapping = {
_normalize_pkginfo_key(k): v
for (k, v) in CROSSWALK_TABLE["Python PKG-INFO"].items()
}
string_fields = [
"name",
"version",
"description",
"summary",
"author",
"author-email",
]
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy())
def translate(self, content):
msg = self._parser.parsebytes(content)
d = {}
for (key, value) in msg.items():
key = _normalize_pkginfo_key(key)
if value != "UNKNOWN":
d.setdefault(key, []).append(value)
- metadata = self._translate_dict(d, normalize=False)
+ return self._translate_dict(d)
- author_name = metadata.pop(SCHEMA.author, None)
- author_email = metadata.pop(SCHEMA.email, None)
+ def extra_translation(self, translated_metadata, d):
+ author_name = translated_metadata.pop(SCHEMA.author, None)
+ author_email = translated_metadata.pop(SCHEMA.email, None)
if author_name or author_email:
- metadata[SCHEMA.author] = {
+ translated_metadata[SCHEMA.author] = {
"@list": [
{
"@type": SCHEMA.Person,
SCHEMA.name: author_name,
SCHEMA.email: author_email,
}
]
}
- return self.normalize_translation(metadata)
def normalize_home_page(self, urls):
return [{"@id": url} for url in urls]
def normalize_keywords(self, keywords):
return list(itertools.chain.from_iterable(s.split(" ") for s in keywords))
def normalize_license(self, licenses):
return [{"@id": license} for license in licenses]