Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import re | import re | ||||
import abc | import abc | ||||
import ast | import ast | ||||
import json | import json | ||||
import logging | import logging | ||||
import itertools | import itertools | ||||
import email.parser | import email.parser | ||||
import xml.parsers.expat | |||||
import email.policy | import email.policy | ||||
import click | import click | ||||
import xmltodict | import defusedxml.ElementTree | ||||
import xmlschema | |||||
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | ||||
from swh.indexer.codemeta import compact, expand | from swh.indexer.codemeta import compact, expand | ||||
from swh.indexer.exc import UnknownNamespace | |||||
MAPPINGS = {} | MAPPINGS = {} | ||||
def register_mapping(cls): | def register_mapping(cls): | ||||
MAPPINGS[cls.__name__] = cls | MAPPINGS[cls.__name__] = cls | ||||
return cls | return cls | ||||
▲ Show 20 Lines • Show All 102 Lines • ▼ Show 20 Lines | class DictMapping(BaseMapping): | ||||
normalization.''' | normalization.''' | ||||
@property | @property | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def mapping(self): | def mapping(self): | ||||
"""A translation dict to map dict keys into a canonical name.""" | """A translation dict to map dict keys into a canonical name.""" | ||||
pass | pass | ||||
def _normalize_key(self, key): | |||||
"""Normalizes a key of the input dictionary so it can be mapped to | |||||
a method name.""" | |||||
return key.replace('-', '_') | |||||
def _translate_dict(self, content_dict, *, normalize=True): | def _translate_dict(self, content_dict, *, normalize=True): | ||||
""" | """ | ||||
Translates content by parsing content from a dict object | Translates content by parsing content from a dict object | ||||
and translating with the appropriate mapping | and translating with the appropriate mapping | ||||
Args: | Args: | ||||
content_dict (dict): content dict to translate | content_dict (dict): content dict to translate | ||||
Returns: | Returns: | ||||
dict: translated metadata in json-friendly form needed for | dict: translated metadata in json-friendly form needed for | ||||
the indexer | the indexer | ||||
""" | """ | ||||
translated_metadata = {'@type': SCHEMA_URI + 'SoftwareSourceCode'} | translated_metadata = {'@type': SCHEMA_URI + 'SoftwareSourceCode'} | ||||
for k, v in content_dict.items(): | for k, v in content_dict.items(): | ||||
# First, check if there is a specific translation | # First, check if there is a specific translation | ||||
# method for this key | # method for this key | ||||
try: | |||||
translation_method = getattr( | translation_method = getattr( | ||||
self, 'translate_' + k.replace('-', '_'), None) | self, 'translate_' + self._normalize_key(k), None) | ||||
except UnknownNamespace: | |||||
continue | |||||
if translation_method: | if translation_method: | ||||
translation_method(translated_metadata, v) | translation_method(translated_metadata, v) | ||||
elif k in self.mapping: | elif k in self.mapping: | ||||
# if there is no method, but the key is known from the | # if there is no method, but the key is known from the | ||||
# crosswalk table | # crosswalk table | ||||
codemeta_key = self.mapping[k] | codemeta_key = self.mapping[k] | ||||
# if there is a normalization method, use it on the value | # if there is a normalization method, use it on the value | ||||
normalization_method = getattr( | normalization_method = getattr( | ||||
self, 'normalize_' + k.replace('-', '_'), None) | self, 'normalize_' + self._normalize_key(k), None) | ||||
if normalization_method: | if normalization_method: | ||||
v = normalization_method(v) | v = normalization_method(v) | ||||
elif k in self.string_fields and isinstance(v, str): | elif k in self.string_fields and isinstance(v, str): | ||||
pass | pass | ||||
elif k in self.string_fields and isinstance(v, list): | elif k in self.string_fields and isinstance(v, list): | ||||
v = [x for x in v if isinstance(x, str)] | v = [x for x in v if isinstance(x, str)] | ||||
else: | else: | ||||
continue | continue | ||||
▲ Show 20 Lines • Show All 202 Lines • ▼ Show 20 Lines | class CodemetaMapping(SingleFileMapping): | ||||
def translate(self, content): | def translate(self, content): | ||||
try: | try: | ||||
return self.normalize_translation(expand( | return self.normalize_translation(expand( | ||||
json.loads(content.decode()))) | json.loads(content.decode()))) | ||||
except Exception: | except Exception: | ||||
return None | return None | ||||
POM_XMLNS = 'http://maven.apache.org/POM/4.0.0' | |||||
POM_PREFIX = '{' + POM_XMLNS + '}' | |||||
@register_mapping | @register_mapping | ||||
class MavenMapping(DictMapping, SingleFileMapping): | class MavenMapping(DictMapping, SingleFileMapping): | ||||
""" | """ | ||||
dedicated class for Maven (pom.xml) mapping and translation | dedicated class for Maven (pom.xml) mapping and translation | ||||
""" | """ | ||||
name = 'maven' | name = 'maven' | ||||
filename = b'pom.xml' | filename = b'pom.xml' | ||||
mapping = CROSSWALK_TABLE['Java (Maven)'] | raw_mapping = CROSSWALK_TABLE['Java (Maven)'] | ||||
string_fields = ['name', 'version', 'description', 'email'] | mapping = {POM_PREFIX + pom_name: codemeta_name | ||||
for (pom_name, codemeta_name) | |||||
in CROSSWALK_TABLE['Java (Maven)'].items()} | |||||
string_fields = [POM_PREFIX + k | |||||
for k in ['name', 'version', 'description', 'email']] | |||||
schema = xmlschema.XMLSchema(os.path.join( | |||||
os.path.dirname(__file__), 'data', 'maven', 'maven-4.0.0.xsd'), | |||||
defuse='always') | |||||
def _normalize_key(self, key): | |||||
if not key.startswith(POM_PREFIX): | |||||
raise UnknownNamespace(key) | |||||
key = key[len(POM_PREFIX):] # strip the prefix | |||||
return super()._normalize_key(key) | |||||
def translate(self, content): | def translate(self, content): | ||||
try: | try: | ||||
d = xmltodict.parse(content).get('project') or {} | tree = defusedxml.ElementTree.fromstring(content) | ||||
except xml.parsers.expat.ExpatError: | except defusedxml.ElementTree.ParseError: | ||||
self.log.warning('Error parsing XML from %s', self.log_suffix) | self.log.warning('Error parsing XML from %s', self.log_suffix) | ||||
return None | return None | ||||
except UnicodeDecodeError: | except UnicodeDecodeError: | ||||
self.log.warning('Error unidecoding XML from %s', self.log_suffix) | self.log.warning('Error unidecoding XML from %s', self.log_suffix) | ||||
return None | return None | ||||
except (LookupError, ValueError): | except (LookupError, ValueError): | ||||
# unknown encoding or multi-byte encoding | # unknown encoding or multi-byte encoding | ||||
self.log.warning('Error detecting XML encoding from %s', | self.log.warning('Error detecting XML encoding from %s', | ||||
self.log_suffix) | self.log_suffix) | ||||
return None | return None | ||||
d = self.schema.to_dict(tree, validation='skip') | |||||
d = d or {} # it may be None if the document is empty but for the root | |||||
metadata = self._translate_dict(d, normalize=False) | metadata = self._translate_dict(d, normalize=False) | ||||
metadata[SCHEMA_URI+'codeRepository'] = self.parse_repositories(d) | metadata[SCHEMA_URI+'codeRepository'] = self.parse_repositories(d) | ||||
metadata[SCHEMA_URI+'license'] = self.parse_licenses(d) | metadata[SCHEMA_URI+'license'] = self.parse_licenses(d) | ||||
return self.normalize_translation(metadata) | return self.normalize_translation(metadata) | ||||
_default_repository = {'url': 'https://repo.maven.apache.org/maven2/'} | _default_repository = { | ||||
POM_PREFIX + 'url': 'https://repo.maven.apache.org/maven2/'} | |||||
def parse_repositories(self, d): | def parse_repositories(self, d): | ||||
"""https://maven.apache.org/pom.html#Repositories | """https://maven.apache.org/pom.html#Repositories | ||||
>>> import xmltodict | >>> tree = defusedxml.ElementTree.fromstring(''' | ||||
>>> from pprint import pprint | ... <project xmlns="http://maven.apache.org/POM/4.0.0"> | ||||
>>> d = xmltodict.parse(''' | |||||
... <repositories> | ... <repositories> | ||||
... <repository> | ... <repository> | ||||
... <id>codehausSnapshots</id> | ... <id>codehausSnapshots</id> | ||||
... <name>Codehaus Snapshots</name> | ... <name>Codehaus Snapshots</name> | ||||
... <url>http://snapshots.maven.codehaus.org/maven2</url> | ... <url>http://snapshots.maven.codehaus.org/maven2</url> | ||||
... <layout>default</layout> | ... <layout>default</layout> | ||||
... </repository> | ... </repository> | ||||
... </repositories> | ... </repositories> | ||||
... </project> | |||||
... ''') | ... ''') | ||||
>>> d = MavenMapping.schema.to_dict(tree) | |||||
>>> MavenMapping().parse_repositories(d) | >>> MavenMapping().parse_repositories(d) | ||||
""" | """ | ||||
repositories = d.get('repositories') | repositories = d.get(POM_PREFIX + 'repositories') | ||||
if not repositories: | if not repositories: | ||||
results = [self.parse_repository(d, self._default_repository)] | results = [self.parse_repository(d, self._default_repository)] | ||||
elif isinstance(repositories, dict): | elif isinstance(repositories, dict): | ||||
repositories = repositories.get('repository') or [] | repositories = repositories.get(POM_PREFIX + 'repository') or [] | ||||
if not isinstance(repositories, list): | if not isinstance(repositories, list): | ||||
repositories = [repositories] | repositories = [repositories] | ||||
results = [self.parse_repository(d, repo) | results = [self.parse_repository(d, repo) | ||||
for repo in repositories] | for repo in repositories] | ||||
else: | else: | ||||
results = [] | results = [] | ||||
return [res for res in results if res] or None | return [res for res in results if res] or None | ||||
def parse_repository(self, d, repo): | def parse_repository(self, d, repo): | ||||
if not isinstance(repo, dict): | if not isinstance(repo, dict): | ||||
return | return | ||||
if repo.get('layout', 'default') != 'default': | if repo.get('layout', 'default') != 'default': | ||||
return # TODO ? | return # TODO ? | ||||
url = repo.get('url') | url = repo.get(POM_PREFIX + 'url') | ||||
group_id = d.get('groupId') | group_id = d.get(POM_PREFIX + 'groupId') | ||||
artifact_id = d.get('artifactId') | artifact_id = d.get(POM_PREFIX + 'artifactId') | ||||
if (isinstance(url, str) and isinstance(group_id, str) | if (isinstance(url, str) and isinstance(group_id, str) | ||||
and isinstance(artifact_id, str)): | and isinstance(artifact_id, str)): | ||||
repo = os.path.join(url, *group_id.split('.'), artifact_id) | repo = os.path.join(url, *group_id.split('.'), artifact_id) | ||||
return {"@id": repo} | return {"@id": repo} | ||||
def normalize_groupId(self, id_): | def normalize_groupId(self, id_): | ||||
"""https://maven.apache.org/pom.html#Maven_Coordinates | """https://maven.apache.org/pom.html#Maven_Coordinates | ||||
>>> MavenMapping().normalize_groupId('org.example') | >>> MavenMapping().normalize_groupId('org.example') | ||||
{'@id': 'org.example'} | {'@id': 'org.example'} | ||||
""" | """ | ||||
if isinstance(id_, str): | if isinstance(id_, str): | ||||
return {"@id": id_} | return {"@id": id_} | ||||
def parse_licenses(self, d): | def parse_licenses(self, d): | ||||
"""https://maven.apache.org/pom.html#Licenses | """https://maven.apache.org/pom.html#Licenses | ||||
>>> import xmltodict | |||||
>>> import json | >>> import json | ||||
>>> d = xmltodict.parse(''' | >>> tree = defusedxml.ElementTree.fromstring(''' | ||||
... <project xmlns="http://maven.apache.org/POM/4.0.0"> | |||||
... <licenses> | ... <licenses> | ||||
... <license> | ... <license> | ||||
... <name>Apache License, Version 2.0</name> | ... <name>Apache License, Version 2.0</name> | ||||
... <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> | ... <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> | ||||
... </license> | ... </license> | ||||
... </licenses> | ... </licenses> | ||||
... </project> | |||||
... ''') | ... ''') | ||||
>>> print(json.dumps(d, indent=4)) | >>> d = MavenMapping.schema.to_dict(tree) | ||||
>>> print(json.dumps(d, indent=4, sort_keys=True)) | |||||
{ | { | ||||
"licenses": { | "{http://maven.apache.org/POM/4.0.0}licenses": { | ||||
"license": { | "{http://maven.apache.org/POM/4.0.0}license": [ | ||||
"name": "Apache License, Version 2.0", | { | ||||
"url": "https://www.apache.org/licenses/LICENSE-2.0.txt" | "{http://maven.apache.org/POM/4.0.0}name": "Apache License, Version 2.0", | ||||
"{http://maven.apache.org/POM/4.0.0}url": "https://www.apache.org/licenses/LICENSE-2.0.txt" | |||||
} | } | ||||
] | |||||
} | } | ||||
} | } | ||||
>>> MavenMapping().parse_licenses(d) | >>> MavenMapping().parse_licenses(d) | ||||
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}] | [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}] | ||||
or, if there are more than one license: | or, if there are more than one license: | ||||
>>> import xmltodict | |||||
>>> from pprint import pprint | >>> from pprint import pprint | ||||
>>> d = xmltodict.parse(''' | >>> tree = defusedxml.ElementTree.fromstring(''' | ||||
... <project xmlns="http://maven.apache.org/POM/4.0.0"> | |||||
... <licenses> | ... <licenses> | ||||
... <license> | ... <license> | ||||
... <name>Apache License, Version 2.0</name> | ... <name>Apache License, Version 2.0</name> | ||||
... <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> | ... <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url> | ||||
... </license> | ... </license> | ||||
... <license> | ... <license> | ||||
... <name>MIT License</name> | ... <name>MIT License</name> | ||||
... <url>https://opensource.org/licenses/MIT</url> | ... <url>https://opensource.org/licenses/MIT</url> | ||||
... </license> | ... </license> | ||||
... </licenses> | ... </licenses> | ||||
... </project> | |||||
... ''') | ... ''') | ||||
>>> d = MavenMapping.schema.to_dict(tree) | |||||
>>> pprint(MavenMapping().parse_licenses(d)) | >>> pprint(MavenMapping().parse_licenses(d)) | ||||
[{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}, | [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}, | ||||
{'@id': 'https://opensource.org/licenses/MIT'}] | {'@id': 'https://opensource.org/licenses/MIT'}] | ||||
""" | """ # noqa: E501 | ||||
licenses = d.get('licenses') | licenses = d.get(POM_PREFIX + 'licenses') | ||||
if not isinstance(licenses, dict): | if not isinstance(licenses, dict): | ||||
return | return | ||||
licenses = licenses.get('license') | licenses = licenses.get(POM_PREFIX + 'license') | ||||
if isinstance(licenses, dict): | if isinstance(licenses, dict): | ||||
licenses = [licenses] | licenses = [licenses] | ||||
elif not isinstance(licenses, list): | elif not isinstance(licenses, list): | ||||
return | return | ||||
return [{"@id": license['url']} | return [{"@id": license[POM_PREFIX + 'url']} | ||||
for license in licenses | for license in licenses | ||||
if isinstance(license, dict) | if isinstance(license, dict) | ||||
and isinstance(license.get('url'), str)] or None | and isinstance(license.get(POM_PREFIX + 'url'), str)] or None | ||||
_normalize_pkginfo_key = str.lower | _normalize_pkginfo_key = str.lower | ||||
class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): | class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): | ||||
def header_fetch_parse(self, name, value): | def header_fetch_parse(self, name, value): | ||||
if hasattr(value, 'name'): | if hasattr(value, 'name'): | ||||
▲ Show 20 Lines • Show All 167 Lines • Show Last 20 Lines |