Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import re | import re | ||||
import abc | import abc | ||||
import ast | |||||
import json | import json | ||||
import logging | import logging | ||||
import email.parser | import email.parser | ||||
import xmltodict | import xmltodict | ||||
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | ||||
from swh.indexer.codemeta import compact, expand | from swh.indexer.codemeta import compact, expand | ||||
MAPPINGS = {} | MAPPINGS = {} | ||||
def register_mapping(cls): | def register_mapping(cls): | ||||
MAPPINGS[cls.__name__] = cls() | MAPPINGS[cls.__name__] = cls() | ||||
return cls | return cls | ||||
def merge_values(v1, v2): | |||||
"""If v1 and v2 are of the form `{"@list": l1}` and `{"@list": l2}`, | |||||
returns `{"@list": l1 + l2}`. | |||||
Otherwise, make them lists (if they are not already) and concatenate | |||||
them. | |||||
>>> merge_values('a', 'b') | |||||
['a', 'b'] | |||||
>>> merge_values(['a', 'b'], 'c') | |||||
['a', 'b', 'c'] | |||||
>>> merge_values({'@list': ['a', 'b']}, {'@list': ['c']}) | |||||
{'@list': ['a', 'b', 'c']} | |||||
""" | |||||
if isinstance(v1, dict) and set(v1) == {'@list'}: | |||||
assert isinstance(v1['@list'], list) | |||||
if isinstance(v2, dict) and set(v2) == {'@list'}: | |||||
assert isinstance(v2['@list'], list) | |||||
return {'@list': v1['@list'] + v2['@list']} | |||||
else: | |||||
raise ValueError('Cannot merge %r and %r' % (v1, v2)) | |||||
else: | |||||
if isinstance(v2, dict) and '@list' in v2: | |||||
raise ValueError('Cannot merge %r and %r' % (v1, v2)) | |||||
if not isinstance(v1, list): | |||||
v1 = [v1] | |||||
if not isinstance(v2, list): | |||||
v2 = [v2] | |||||
return v1 + v2 | |||||
class BaseMapping(metaclass=abc.ABCMeta): | class BaseMapping(metaclass=abc.ABCMeta): | ||||
"""Base class for mappings to inherit from | """Base class for mappings to inherit from | ||||
To implement a new mapping: | To implement a new mapping: | ||||
- inherit this class | - inherit this class | ||||
- override translate function | - override translate function | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def translate_dict(self, content_dict, *, normalize=True): | ||||
# method for this key | # method for this key | ||||
translation_method = getattr( | translation_method = getattr( | ||||
self, 'translate_' + k.replace('-', '_'), None) | self, 'translate_' + k.replace('-', '_'), None) | ||||
if translation_method: | if translation_method: | ||||
translation_method(translated_metadata, v) | translation_method(translated_metadata, v) | ||||
elif k in self.mapping: | elif k in self.mapping: | ||||
# if there is no method, but the key is known from the | # if there is no method, but the key is known from the | ||||
# crosswalk table | # crosswalk table | ||||
codemeta_key = self.mapping[k] | |||||
# if there is a normalization method, use it on the value | # if there is a normalization method, use it on the value | ||||
normalization_method = getattr( | normalization_method = getattr( | ||||
self, 'normalize_' + k.replace('-', '_'), None) | self, 'normalize_' + k.replace('-', '_'), None) | ||||
if normalization_method: | if normalization_method: | ||||
v = normalization_method(v) | v = normalization_method(v) | ||||
# set the translation metadata with the normalized value | # set the translation metadata with the normalized value | ||||
translated_metadata[self.mapping[k]] = v | if codemeta_key in translated_metadata: | ||||
translated_metadata[codemeta_key] = merge_values( | |||||
translated_metadata[codemeta_key], v) | |||||
else: | |||||
translated_metadata[codemeta_key] = v | |||||
if normalize: | if normalize: | ||||
return self.normalize_translation(translated_metadata) | return self.normalize_translation(translated_metadata) | ||||
else: | else: | ||||
return translated_metadata | return translated_metadata | ||||
douardda: Why a method? self is unused, so a bare function is fine here.
Which then allows you to very… | |||||
class JsonMapping(DictMapping, SingleFileMapping): | class JsonMapping(DictMapping, SingleFileMapping): | ||||
"""Base class for all mappings that use a JSON file as input.""" | """Base class for all mappings that use a JSON file as input.""" | ||||
def translate(self, raw_content): | def translate(self, raw_content): | ||||
""" | """ | ||||
Translates content by parsing content from a bytestring containing | Translates content by parsing content from a bytestring containing | ||||
json data and translating with the appropriate mapping | json data and translating with the appropriate mapping | ||||
▲ Show 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | def translate(self, content): | ||||
SCHEMA_URI+'name': | SCHEMA_URI+'name': | ||||
metadata.pop(SCHEMA_URI+'author', [None])[0], | metadata.pop(SCHEMA_URI+'author', [None])[0], | ||||
SCHEMA_URI+'email': | SCHEMA_URI+'email': | ||||
metadata.pop(SCHEMA_URI+'email', [None])[0], | metadata.pop(SCHEMA_URI+'email', [None])[0], | ||||
}] | }] | ||||
} | } | ||||
return self.normalize_translation(metadata) | return self.normalize_translation(metadata) | ||||
def translate_summary(self, translated_metadata, v): | |||||
k = self.mapping['summary'] | |||||
translated_metadata.setdefault(k, []).append(v) | |||||
def translate_description(self, translated_metadata, v): | |||||
k = self.mapping['description'] | |||||
translated_metadata.setdefault(k, []).append(v) | |||||
def normalize_home_page(self, urls): | def normalize_home_page(self, urls): | ||||
return [{'@id': url} for url in urls] | return [{'@id': url} for url in urls] | ||||
def normalize_license(self, licenses): | def normalize_license(self, licenses): | ||||
return [{'@id': license} for license in licenses] | return [{'@id': license} for license in licenses] | ||||
@register_mapping | |||||
class GemspecMapping(DictMapping): | |||||
_re_spec_new = re.compile(r'.*Gem::Specification.new do \|.*\|.*') | |||||
_re_spec_entry = re.compile(r'\s*\w+\.(?P<key>\w+)\s*=\s*(?P<value>.*)') | |||||
mapping = CROSSWALK_TABLE['Ruby Gem'] | |||||
def detect_metadata_files(self, file_entries): | |||||
for entry in file_entries: | |||||
if entry['name'].endswith(b'.gemspec'): | |||||
return [entry['sha1']] | |||||
return [] | |||||
def translate(self, raw_content): | |||||
try: | |||||
raw_content = raw_content.decode() | |||||
except UnicodeDecodeError: | |||||
self.log.warning('Error unidecoding %r', raw_content) | |||||
return | |||||
# Skip lines before 'Gem::Specification.new' | |||||
lines = iter(raw_content.split('\n')) | |||||
for line in lines: | |||||
match = self._re_spec_new.match(line) | |||||
if match: | |||||
break | |||||
else: | |||||
self.log.warning('Could not find Gem::Specification in %r', | |||||
raw_content) | |||||
return | |||||
content_dict = {} | |||||
for line in lines: | |||||
match = self._re_spec_entry.match(line) | |||||
if match: | |||||
raw_value = match.group('value') | |||||
raw_value = raw_value.replace('.freeze', '') | |||||
try: | |||||
# We're parsing Ruby expressions here, but Python's | |||||
# ast.literal_eval is rather good at parsing simple | |||||
# Ruby expressions (mainly strings delimited with " or ', | |||||
# and lists of such strings). | |||||
value = ast.literal_eval(raw_value) | |||||
except (SyntaxError, ValueError): | |||||
# Obviously, ast.literal_eval won't work on any Ruby code | |||||
continue | |||||
content_dict[match.group('key')] = value | |||||
return self.translate_dict(content_dict) | |||||
def normalize_homepage(self, s): | |||||
return {"@id": s} | |||||
def normalize_license(self, s): | |||||
if isinstance(s, str): | |||||
return [{"@id": "https://spdx.org/licenses/" + s}] | |||||
def normalize_licenses(self, licenses): | |||||
if isinstance(licenses, list): | |||||
return [{"@id": "https://spdx.org/licenses/" + license} | |||||
for license in licenses | |||||
if isinstance(license, str)] | |||||
def normalize_author(self, author): | |||||
return {"@list": [author]} | |||||
def normalize_authors(self, authors): | |||||
return {"@list": authors} | |||||
def main(): | def main(): | ||||
raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | ||||
raw_content1 = b"""{"name": "test_name", | raw_content1 = b"""{"name": "test_name", | ||||
"unknown_term": "ut", | "unknown_term": "ut", | ||||
"prerequisites" :"packageXYZ"}""" | "prerequisites" :"packageXYZ"}""" | ||||
result = MAPPINGS["NpmMapping"].translate(raw_content) | result = MAPPINGS["NpmMapping"].translate(raw_content) | ||||
result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | ||||
print(result) | print(result) | ||||
print(result1) | print(result1) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |
Why a method? self is unused, so a bare function is fine here.
Which then allows you to very easily write unit tests for 😄