Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary/ruby.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import ast | import ast | ||||
import itertools | import itertools | ||||
import re | import re | ||||
from typing import Any, Dict, List, Optional | |||||
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | ||||
from .base import DictMapping | from .base import DictMapping | ||||
def name_to_person(name): | def name_to_person(name: str) -> Dict[str, str]: | ||||
return { | return { | ||||
"@type": SCHEMA_URI + "Person", | "@type": SCHEMA_URI + "Person", | ||||
SCHEMA_URI + "name": name, | SCHEMA_URI + "name": name, | ||||
} | } | ||||
class GemspecMapping(DictMapping): | class GemspecMapping(DictMapping): | ||||
name = "gemspec" | name = "gemspec" | ||||
mapping = CROSSWALK_TABLE["Ruby Gem"] | mapping = CROSSWALK_TABLE["Ruby Gem"] | ||||
string_fields = ["name", "version", "description", "summary", "email"] | string_fields = ["name", "version", "description", "summary", "email"] | ||||
_re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") | _re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") | ||||
_re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)") | _re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)") | ||||
@classmethod | @classmethod | ||||
def detect_metadata_files(cls, file_entries): | def detect_metadata_files(cls: Any, file_entries: Any) -> List[str]: | ||||
for entry in file_entries: | for entry in file_entries: | ||||
if entry["name"].endswith(b".gemspec"): | if entry["name"].endswith(b".gemspec"): | ||||
return [entry["sha1"]] | return [entry["sha1"]] | ||||
return [] | return [] | ||||
def translate(self, raw_content): | def translate(self, raw_content: Any) -> Optional[Dict[str, str]]: | ||||
try: | try: | ||||
raw_content = raw_content.decode() | raw_content = raw_content.decode() | ||||
except UnicodeDecodeError: | except UnicodeDecodeError: | ||||
self.log.warning("Error unidecoding from %s", self.log_suffix) | self.log.warning("Error unidecoding from %s", self.log_suffix) | ||||
return | return None | ||||
# Skip lines before 'Gem::Specification.new' | # Skip lines before 'Gem::Specification.new' | ||||
lines = itertools.dropwhile( | lines = itertools.dropwhile( | ||||
lambda x: not self._re_spec_new.match(x), raw_content.split("\n") | lambda x: not self._re_spec_new.match(x), raw_content.split("\n") | ||||
) | ) | ||||
try: | try: | ||||
next(lines) # Consume 'Gem::Specification.new' | next(lines) # Consume 'Gem::Specification.new' | ||||
except StopIteration: | except StopIteration: | ||||
self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) | self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) | ||||
return | return None | ||||
content_dict = {} | content_dict = {} | ||||
for line in lines: | for line in lines: | ||||
match = self._re_spec_entry.match(line) | match = self._re_spec_entry.match(line) | ||||
if match: | if match: | ||||
value = self.eval_ruby_expression(match.group("expr")) | value = self.eval_ruby_expression(match.group("expr")) | ||||
if value: | if value: | ||||
content_dict[match.group("key")] = value | content_dict[match.group("key")] = value | ||||
return self._translate_dict(content_dict) | return self._translate_dict(content_dict) | ||||
def eval_ruby_expression(self, expr): | def eval_ruby_expression(self, expr: str) -> Any: | ||||
"""Very simple evaluator of Ruby expressions. | """Very simple evaluator of Ruby expressions. | ||||
>>> GemspecMapping().eval_ruby_expression('"Foo bar"') | >>> GemspecMapping().eval_ruby_expression('"Foo bar"') | ||||
'Foo bar' | 'Foo bar' | ||||
>>> GemspecMapping().eval_ruby_expression("'Foo bar'") | >>> GemspecMapping().eval_ruby_expression("'Foo bar'") | ||||
'Foo bar' | 'Foo bar' | ||||
>>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") | >>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") | ||||
['Foo', 'bar'] | ['Foo', 'bar'] | ||||
Show All 19 Lines | def eval_ruby_expression(self, expr: str) -> Any: | ||||
expr = expr.replace(".freeze", "") | expr = expr.replace(".freeze", "") | ||||
try: | try: | ||||
# We're parsing Ruby expressions here, but Python's | # We're parsing Ruby expressions here, but Python's | ||||
# ast.parse works for very simple Ruby expressions | # ast.parse works for very simple Ruby expressions | ||||
# (mainly strings delimited with " or ', and lists | # (mainly strings delimited with " or ', and lists | ||||
# of such strings). | # of such strings). | ||||
tree = ast.parse(expr, mode="eval") | tree = ast.parse(expr, mode="eval") | ||||
except (SyntaxError, ValueError): | except (SyntaxError, ValueError): | ||||
return | return None | ||||
if isinstance(tree, ast.Expression): | if isinstance(tree, ast.Expression): | ||||
return evaluator(tree.body) | return evaluator(tree.body) | ||||
def normalize_homepage(self, s): | def normalize_homepage(self, s: str) -> Dict[str, str]: | ||||
if isinstance(s, str): | if isinstance(s, str): | ||||
return {"@id": s} | return {"@id": s} | ||||
def normalize_license(self, s): | def normalize_license(self, s: str) -> List[Dict[str, str]]: | ||||
if isinstance(s, str): | if isinstance(s, str): | ||||
return [{"@id": "https://spdx.org/licenses/" + s}] | return [{"@id": "https://spdx.org/licenses/" + s}] | ||||
def normalize_licenses(self, licenses): | def normalize_licenses(self, licenses: List[str]) -> Any: | ||||
if isinstance(licenses, list): | if isinstance(licenses, list): | ||||
return [ | return [ | ||||
{"@id": "https://spdx.org/licenses/" + license} | {"@id": "https://spdx.org/licenses/" + license} | ||||
for license in licenses | for license in licenses | ||||
if isinstance(license, str) | if isinstance(license, str) | ||||
] | ] | ||||
def normalize_author(self, author): | def normalize_author(self, author: str) -> Any: | ||||
if isinstance(author, str): | if isinstance(author, str): | ||||
return {"@list": [name_to_person(author)]} | return {"@list": [name_to_person(author)]} | ||||
def normalize_authors(self, authors): | def normalize_authors(self, authors: List[str]) -> Any: | ||||
if isinstance(authors, list): | if isinstance(authors, list): | ||||
return { | return { | ||||
"@list": [ | "@list": [ | ||||
name_to_person(author) | name_to_person(author) | ||||
for author in authors | for author in authors | ||||
if isinstance(author, str) | if isinstance(author, str) | ||||
] | ] | ||||
} | } |