diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py
index 81df82a..d33bc98 100644
--- a/swh/indexer/metadata_dictionary/__init__.py
+++ b/swh/indexer/metadata_dictionary/__init__.py
@@ -1,40 +1,40 @@
import collections
import click
from . import cff, codemeta, maven, npm, python, ruby
MAPPINGS = {
"CodemetaMapping": codemeta.CodemetaMapping,
"MavenMapping": maven.MavenMapping,
"NpmMapping": npm.NpmMapping,
"PythonPkginfoMapping": python.PythonPkginfoMapping,
"GemspecMapping": ruby.GemspecMapping,
"CffMapping": cff.CffMapping,
}
def list_terms():
"""Returns a dictionary with all supported CodeMeta terms as keys,
and the mappings that support each of them as values."""
d = collections.defaultdict(set)
for mapping in MAPPINGS.values():
for term in mapping.supported_terms():
d[term].add(mapping)
return d
@click.command()
@click.argument("mapping_name")
@click.argument("file_name")
-def main(mapping_name, file_name):
+def main(mapping_name: str, file_name: str):
from pprint import pprint
with open(file_name, "rb") as fd:
file_content = fd.read()
res = MAPPINGS[mapping_name]().translate(file_content)
pprint(res)
if __name__ == "__main__":
main()
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index 0a2becf..774875c 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,177 +1,180 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
-from typing import List
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
class BaseMapping:
"""Base class for mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
- def detect_metadata_files(cls, files):
+ def detect_metadata_files(cls, files: List[Dict[str, str]]) -> List[str]:
"""
Detects files potentially containing metadata
Args:
file_entries (list): list of files
Returns:
list: list of sha1 (possibly empty)
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
- def translate(self, file_content):
+ def translate(self, file_content: bytes) -> Optional[Dict]:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
- def normalize_translation(self, metadata):
+ def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata)
class SingleFileMapping(BaseMapping):
"""Base class for all mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls, file_entries: List[Dict[str, str]]) -> List[str]:
for entry in file_entries:
- if entry["name"].lower() == cls.filename.lower():
+ if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields = [] # type: List[str]
"""List of fields that are simple strings, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
- def _normalize_method_name(name):
+ def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
return {
term
for (key, term) in cls.mapping.items()
if key in cls.string_fields
or hasattr(cls, "translate_" + cls._normalize_method_name(key))
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
- def _translate_dict(self, content_dict, *, normalize=True):
+ def _translate_dict(
+ self, content_dict: Dict, *, normalize: bool = True
+ ) -> Dict[str, str]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"}
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(translated_metadata, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
elif k in self.string_fields and isinstance(v, str):
pass
elif k in self.string_fields and isinstance(v, list):
v = [x for x in v if isinstance(x, str)]
else:
continue
# set the translation metadata with the normalized value
if codemeta_key in translated_metadata:
translated_metadata[codemeta_key] = merge_values(
translated_metadata[codemeta_key], v
)
else:
translated_metadata[codemeta_key] = v
if normalize:
return self.normalize_translation(translated_metadata)
else:
return translated_metadata
class JsonMapping(DictMapping, SingleFileMapping):
"""Base class for all mappings that use a JSON file as input."""
- def translate(self, raw_content):
+ def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
raw_content (bytes): raw content to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
try:
- raw_content = raw_content.decode()
+ raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
try:
- content_dict = json.loads(raw_content)
+ content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
- return
+ return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
+ return None
diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py
index 43f944d..934579e 100644
--- a/swh/indexer/metadata_dictionary/cff.py
+++ b/swh/indexer/metadata_dictionary/cff.py
@@ -1,65 +1,69 @@
+from typing import Dict, List, Optional, Union
+
import yaml
from swh.indexer.codemeta import CODEMETA_CONTEXT_URL, CROSSWALK_TABLE, SCHEMA_URI
from .base import DictMapping, SingleFileMapping
yaml.SafeLoader.yaml_implicit_resolvers = {
k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"]
for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items()
}
class CffMapping(DictMapping, SingleFileMapping):
"""Dedicated class for Citation (CITATION.cff) mapping and translation"""
name = "cff"
filename = b"CITATION.cff"
mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"]
string_fields = ["keywords", "license", "abstract", "version", "doi"]
- def translate(self, raw_content):
- raw_content = raw_content.decode()
- content_dict = yaml.load(raw_content, Loader=yaml.SafeLoader)
+ def translate(self, raw_content: bytes) -> Dict[str, str]:
+ raw_content_string: str = raw_content.decode()
+ content_dict = yaml.load(raw_content_string, Loader=yaml.SafeLoader)
metadata = self._translate_dict(content_dict)
metadata["@context"] = CODEMETA_CONTEXT_URL
return metadata
- def normalize_authors(self, d):
+ def normalize_authors(self, d: List[dict]) -> Dict[str, list]:
result = []
for author in d:
- author_data = {"@type": SCHEMA_URI + "Person"}
+ author_data: Dict[str, Optional[Union[str, Dict]]] = {
+ "@type": SCHEMA_URI + "Person"
+ }
if "orcid" in author:
author_data["@id"] = author["orcid"]
if "affiliation" in author:
author_data[SCHEMA_URI + "affiliation"] = {
"@type": SCHEMA_URI + "Organization",
SCHEMA_URI + "name": author["affiliation"],
}
if "family-names" in author:
author_data[SCHEMA_URI + "familyName"] = author["family-names"]
if "given-names" in author:
author_data[SCHEMA_URI + "givenName"] = author["given-names"]
result.append(author_data)
- result = {"@list": result}
- return result
+ result_final = {"@list": result}
+ return result_final
- def normalize_doi(self, s):
+ def normalize_doi(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
return {"@id": "https://doi.org/" + s}
- def normalize_license(self, s):
+ def normalize_license(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
- def normalize_repository_code(self, s):
+ def normalize_repository_code(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
return {"@id": s}
- def normalize_date_released(self, s):
+ def normalize_date_released(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
return {"@value": s, "@type": SCHEMA_URI + "Date"}
diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py
index a93d6d5..0bbb3fa 100644
--- a/swh/indexer/metadata_dictionary/codemeta.py
+++ b/swh/indexer/metadata_dictionary/codemeta.py
@@ -1,30 +1,31 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CODEMETA_TERMS, expand
from .base import SingleFileMapping
class CodemetaMapping(SingleFileMapping):
"""
dedicated class for CodeMeta (codemeta.json) mapping and translation
"""
name = "codemeta"
filename = b"codemeta.json"
string_fields = None
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
- def translate(self, content):
+ def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
return None
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
index f5f6d9b..ad4c5ed 100644
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -1,161 +1,162 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
+from typing import Any, Dict, Optional
import xml.parsers.expat
import xmltodict
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
from .base import DictMapping, SingleFileMapping
class MavenMapping(DictMapping, SingleFileMapping):
"""
dedicated class for Maven (pom.xml) mapping and translation
"""
name = "maven"
filename = b"pom.xml"
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
- def translate(self, content):
+ def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
d = xmltodict.parse(content).get("project") or {}
except xml.parsers.expat.ExpatError:
self.log.warning("Error parsing XML from %s", self.log_suffix)
return None
except UnicodeDecodeError:
self.log.warning("Error unidecoding XML from %s", self.log_suffix)
return None
except (LookupError, ValueError):
# unknown encoding or multi-byte encoding
self.log.warning("Error detecting XML encoding from %s", self.log_suffix)
return None
if not isinstance(d, dict):
self.log.warning("Skipping ill-formed XML content: %s", content)
return None
metadata = self._translate_dict(d, normalize=False)
metadata[SCHEMA_URI + "codeRepository"] = self.parse_repositories(d)
metadata[SCHEMA_URI + "license"] = self.parse_licenses(d)
return self.normalize_translation(metadata)
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
def parse_repositories(self, d):
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... codehausSnapshots
... Codehaus Snapshots
... http://snapshots.maven.codehaus.org/maven2
... default
...
...
... ''')
>>> MavenMapping().parse_repositories(d)
"""
repositories = d.get("repositories")
if not repositories:
results = [self.parse_repository(d, self._default_repository)]
elif isinstance(repositories, dict):
repositories = repositories.get("repository") or []
if not isinstance(repositories, list):
repositories = [repositories]
results = [self.parse_repository(d, repo) for repo in repositories]
else:
results = []
return [res for res in results if res] or None
def parse_repository(self, d, repo):
if not isinstance(repo, dict):
return
if repo.get("layout", "default") != "default":
return # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
if (
isinstance(url, str)
and isinstance(group_id, str)
and isinstance(artifact_id, str)
):
repo = os.path.join(url, *group_id.split("."), artifact_id)
return {"@id": repo}
def normalize_groupId(self, id_):
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
{'@id': 'org.example'}
"""
if isinstance(id_, str):
return {"@id": id_}
def parse_licenses(self, d):
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
>>> import json
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... ''')
>>> print(json.dumps(d, indent=4))
{
"licenses": {
"license": {
"name": "Apache License, Version 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.txt"
}
}
}
>>> MavenMapping().parse_licenses(d)
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}]
or, if there are more than one license:
>>> import xmltodict
>>> from pprint import pprint
>>> d = xmltodict.parse('''
...
...
... Apache License, Version 2.0
... https://www.apache.org/licenses/LICENSE-2.0.txt
...
...
... MIT License
... https://opensource.org/licenses/MIT
...
...
... ''')
>>> pprint(MavenMapping().parse_licenses(d))
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'},
{'@id': 'https://opensource.org/licenses/MIT'}]
"""
licenses = d.get("licenses")
if not isinstance(licenses, dict):
return
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
return
return [
{"@id": license["url"]}
for license in licenses
if isinstance(license, dict) and isinstance(license.get("url"), str)
] or None