Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | |||||
import re | |||||
import abc | import abc | ||||
import csv | |||||
import json | import json | ||||
import os.path | |||||
import logging | import logging | ||||
import xmltodict | |||||
import swh.indexer | from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | ||||
from swh.indexer.codemeta import compact, expand | |||||
CROSSWALK_TABLE_PATH = os.path.join(os.path.dirname(swh.indexer.__file__), | |||||
'data', 'codemeta', 'crosswalk.csv') | |||||
def read_crosstable(fd): | |||||
reader = csv.reader(fd) | |||||
try: | |||||
header = next(reader) | |||||
except StopIteration: | |||||
raise ValueError('empty file') | |||||
data_sources = set(header) - {'Parent Type', 'Property', | |||||
'Type', 'Description'} | |||||
assert 'codemeta-V1' in data_sources | |||||
codemeta_translation = {data_source: {} for data_source in data_sources} | |||||
for line in reader: # For each canonical name | |||||
canonical_name = dict(zip(header, line))['Property'] | |||||
for (col, value) in zip(header, line): # For each cell in the row | |||||
if col in data_sources: | |||||
# If that's not the parentType/property/type/description | |||||
for local_name in value.split('/'): | |||||
# For each of the data source's properties that maps | |||||
# to this canonical name | |||||
if local_name.strip(): | |||||
codemeta_translation[col][local_name.strip()] = \ | |||||
canonical_name | |||||
return codemeta_translation | |||||
with open(CROSSWALK_TABLE_PATH) as fd: | |||||
CROSSWALK_TABLE = read_crosstable(fd) | |||||
MAPPINGS = {} | MAPPINGS = {} | ||||
def register_mapping(cls): | def register_mapping(cls): | ||||
MAPPINGS[cls.__name__] = cls() | MAPPINGS[cls.__name__] = cls() | ||||
return cls | return cls | ||||
Show All 24 Lines | def detect_metadata_files(self, files): | ||||
- list of sha1 otherwise | - list of sha1 otherwise | ||||
""" | """ | ||||
pass | pass | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def translate(self, file_content): | def translate(self, file_content): | ||||
pass | pass | ||||
def normalize_translation(self, metadata): | |||||
return compact(metadata) | |||||
class SingleFileMapping(BaseMapping): | |||||
"""Base class for all mappings that use a single file as input.""" | |||||
@property | |||||
@abc.abstractmethod | |||||
def filename(self): | |||||
"""The .json file to extract metadata from.""" | |||||
pass | |||||
def detect_metadata_files(self, file_entries): | |||||
for entry in file_entries: | |||||
if entry['name'] == self.filename: | |||||
return [entry['sha1']] | |||||
return [] | |||||
class DictMapping(BaseMapping): | class DictMapping(BaseMapping): | ||||
"""Base class for mappings that take as input a file that is mostly | """Base class for mappings that take as input a file that is mostly | ||||
a key-value store (eg. a shallow JSON dict).""" | a key-value store (eg. a shallow JSON dict).""" | ||||
@property | @property | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def mapping(self): | def mapping(self): | ||||
"""A translation dict to map dict keys into a canonical name.""" | """A translation dict to map dict keys into a canonical name.""" | ||||
pass | pass | ||||
def translate_dict(self, content_dict): | def translate_dict(self, content_dict, *, normalize=True): | ||||
""" | """ | ||||
Translates content by parsing content from a dict object | Translates content by parsing content from a dict object | ||||
and translating with the appropriate mapping | and translating with the appropriate mapping | ||||
Args: | Args: | ||||
content_dict (dict) | content_dict (dict) | ||||
Returns: | Returns: | ||||
dict: translated metadata in json-friendly form needed for | dict: translated metadata in json-friendly form needed for | ||||
the indexer | the indexer | ||||
""" | """ | ||||
translated_metadata = {} | translated_metadata = {'@type': SCHEMA_URI + 'SoftwareSourceCode'} | ||||
default = 'other' | |||||
translated_metadata['other'] = {} | |||||
try: | |||||
for k, v in content_dict.items(): | for k, v in content_dict.items(): | ||||
try: | # First, check if there is a specific translation | ||||
term = self.mapping.get(k, default) | # method for this key | ||||
if term not in translated_metadata: | translation_method = getattr(self, 'translate_' + k, None) | ||||
translated_metadata[term] = v | if translation_method: | ||||
continue | translation_method(translated_metadata, v) | ||||
if isinstance(translated_metadata[term], str): | elif k in self.mapping: | ||||
in_value = translated_metadata[term] | # if there is no method, but the key is known from the | ||||
translated_metadata[term] = [in_value, v] | # crosswalk table | ||||
continue | |||||
if isinstance(translated_metadata[term], list): | # if there is a normalization method, use it on the value | ||||
translated_metadata[term].append(v) | normalization_method = getattr(self, 'normalize_' + k, None) | ||||
continue | if normalization_method: | ||||
if isinstance(translated_metadata[term], dict): | v = normalization_method(v) | ||||
translated_metadata[term][k] = v | |||||
continue | # set the translation metadata with the normalized value | ||||
except KeyError: | translated_metadata[self.mapping[k]] = v | ||||
self.log.exception( | if normalize: | ||||
"Problem during item mapping") | return self.normalize_translation(translated_metadata) | ||||
continue | else: | ||||
except Exception: | |||||
raise | |||||
return None | |||||
return translated_metadata | return translated_metadata | ||||
class JsonMapping(DictMapping): | class JsonMapping(DictMapping, SingleFileMapping): | ||||
"""Base class for all mappings that use a JSON file as input.""" | """Base class for all mappings that use a JSON file as input.""" | ||||
@property | |||||
@abc.abstractmethod | |||||
def filename(self): | |||||
"""The .json file to extract metadata from.""" | |||||
pass | |||||
def detect_metadata_files(self, file_entries): | |||||
for entry in file_entries: | |||||
if entry['name'] == self.filename: | |||||
return [entry['sha1']] | |||||
return [] | |||||
def translate(self, raw_content): | def translate(self, raw_content): | ||||
""" | """ | ||||
Translates content by parsing content from a bytestring containing | Translates content by parsing content from a bytestring containing | ||||
json data and translating with the appropriate mapping | json data and translating with the appropriate mapping | ||||
Args: | Args: | ||||
raw_content: bytes | raw_content: bytes | ||||
Show All 18 Lines | |||||
@register_mapping | @register_mapping | ||||
class NpmMapping(JsonMapping): | class NpmMapping(JsonMapping): | ||||
""" | """ | ||||
dedicated class for NPM (package.json) mapping and translation | dedicated class for NPM (package.json) mapping and translation | ||||
""" | """ | ||||
mapping = CROSSWALK_TABLE['NodeJS'] | mapping = CROSSWALK_TABLE['NodeJS'] | ||||
filename = b'package.json' | filename = b'package.json' | ||||
_schema_shortcuts = { | |||||
'github': 'https://github.com/', | |||||
'gist': 'https://gist.github.com/', | |||||
'bitbucket': 'https://bitbucket.org/', | |||||
'gitlab': 'https://gitlab.com/', | |||||
} | |||||
def normalize_repository(self, d): | |||||
"""https://docs.npmjs.com/files/package.json#repository""" | |||||
if isinstance(d, dict): | |||||
return '{type}+{url}'.format(**d) | |||||
elif isinstance(d, str): | |||||
if '://' in d: | |||||
return d | |||||
elif ':' in d: | |||||
(schema, rest) = d.split(':', 1) | |||||
if schema in self._schema_shortcuts: | |||||
return self._schema_shortcuts[schema] + rest | |||||
else: | |||||
return None | |||||
else: | |||||
return self._schema_shortcuts['github'] + d | |||||
else: | |||||
return None | |||||
def normalize_bugs(self, d): | |||||
return '{url}'.format(**d) | |||||
_parse_author = re.compile(r'^ *' | |||||
r'(?P<name>.*?)' | |||||
r'( +<(?P<email>.*)>)?' | |||||
r'( +\((?P<url>.*)\))?' | |||||
r' *$') | |||||
def normalize_author(self, d): | |||||
'https://docs.npmjs.com/files/package.json' \ | |||||
'#people-fields-author-contributors' | |||||
author = {'@type': SCHEMA_URI+'Person'} | |||||
if isinstance(d, dict): | |||||
name = d.get('name', None) | |||||
email = d.get('email', None) | |||||
url = d.get('url', None) | |||||
elif isinstance(d, str): | |||||
match = self._parse_author.match(d) | |||||
name = match.group('name') | |||||
email = match.group('email') | |||||
url = match.group('url') | |||||
else: | |||||
return None | |||||
if name: | |||||
author[SCHEMA_URI+'name'] = name | |||||
if email: | |||||
author[SCHEMA_URI+'email'] = email | |||||
if url: | |||||
author[SCHEMA_URI+'url'] = url | |||||
return author | |||||
@register_mapping | @register_mapping | ||||
class CodemetaMapping(JsonMapping): | class CodemetaMapping(SingleFileMapping): | ||||
""" | """ | ||||
dedicated class for CodeMeta (codemeta.json) mapping and translation | dedicated class for CodeMeta (codemeta.json) mapping and translation | ||||
""" | """ | ||||
mapping = CROSSWALK_TABLE['codemeta-V1'] | |||||
filename = b'codemeta.json' | filename = b'codemeta.json' | ||||
def translate(self, content): | |||||
return self.normalize_translation(expand(json.loads(content.decode()))) | |||||
@register_mapping | |||||
class MavenMapping(DictMapping, SingleFileMapping): | |||||
""" | |||||
dedicated class for Maven (pom.xml) mapping and translation | |||||
""" | |||||
filename = b'pom.xml' | |||||
mapping = CROSSWALK_TABLE['Java (Maven)'] | |||||
def translate(self, content): | |||||
d = xmltodict.parse(content)['project'] | |||||
metadata = self.translate_dict(d, normalize=False) | |||||
metadata[SCHEMA_URI+'codeRepository'] = self.parse_repositories(d) | |||||
return self.normalize_translation(metadata) | |||||
_default_repository = {'url': 'https://repo.maven.apache.org/maven2/'} | |||||
def parse_repositories(self, d): | |||||
"""https://maven.apache.org/pom.html#Repositories""" | |||||
if 'repositories' not in d: | |||||
return [self.parse_repository(d, self._default_repository)] | |||||
else: | |||||
repositories = d['repositories'].get('repository', []) | |||||
if not isinstance(repositories, list): | |||||
repositories = [repositories] | |||||
results = [] | |||||
for repo in repositories: | |||||
res = self.parse_repository(d, repo) | |||||
if res: | |||||
results.append(res) | |||||
return results | |||||
def parse_repository(self, d, repo): | |||||
if repo.get('layout', 'default') != 'default': | |||||
return # TODO ? | |||||
url = repo['url'] | |||||
if d['groupId']: | |||||
url = os.path.join(url, *d['groupId'].split('.')) | |||||
if d['artifactId']: | |||||
url = os.path.join(url, d['artifactId']) | |||||
return url | |||||
def main(): | def main(): | ||||
raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | ||||
raw_content1 = b"""{"name": "test_name", | raw_content1 = b"""{"name": "test_name", | ||||
"unknown_term": "ut", | "unknown_term": "ut", | ||||
"prerequisites" :"packageXYZ"}""" | "prerequisites" :"packageXYZ"}""" | ||||
result = MAPPINGS["NpmMapping"].translate(raw_content) | result = MAPPINGS["NpmMapping"].translate(raw_content) | ||||
result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | ||||
print(result) | print(result) | ||||
print(result1) | print(result1) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |