Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | |||||
import csv | |||||
import json | import json | ||||
import os.path | |||||
import logging | |||||
import swh.indexer | |||||
def convert(raw_content): | CROSSWALK_TABLE_PATH = os.path.join(os.path.dirname(swh.indexer.__file__), | ||||
""" | 'data', 'codemeta', 'crosswalk.csv') | ||||
convert raw_content recursively: | |||||
- from bytes to string | |||||
- from string to dict | |||||
Args: | def read_crosstable(fd): | ||||
raw_content (bytes / string / dict) | reader = csv.reader(fd) | ||||
try: | |||||
header = next(reader) | |||||
except StopIteration: | |||||
raise ValueError('empty file') | |||||
Returns: | data_sources = set(header) - {'Parent Type', 'Property', | ||||
dict: content (if string was json, otherwise returns string) | 'Type', 'Description'} | ||||
assert 'codemeta-V1' in data_sources | |||||
codemeta_translation = {data_source: {} for data_source in data_sources} | |||||
for line in reader: # For each canonical name | |||||
canonical_name = dict(zip(header, line))['Property'] | |||||
for (col, value) in zip(header, line): # For each cell in the row | |||||
if col in data_sources: | |||||
# If that's not the parentType/property/type/description | |||||
for local_name in value.split('/'): | |||||
# For each of the data source's properties that maps | |||||
# to this canonical name | |||||
if local_name.strip(): | |||||
codemeta_translation[col][local_name.strip()] = \ | |||||
canonical_name | |||||
return codemeta_translation | |||||
with open(CROSSWALK_TABLE_PATH) as fd: | |||||
CROSSWALK_TABLE = read_crosstable(fd) | |||||
MAPPINGS = {} | |||||
""" | |||||
if isinstance(raw_content, bytes): | |||||
return convert(raw_content.decode()) | |||||
if isinstance(raw_content, str): | |||||
try: | |||||
content = json.loads(raw_content) | |||||
if content: | |||||
return content | |||||
else: | |||||
return raw_content | |||||
except json.decoder.JSONDecodeError: | |||||
return raw_content | |||||
if isinstance(raw_content, dict): | |||||
return raw_content | |||||
def register_mapping(cls): | |||||
MAPPINGS[cls.__name__] = cls() | |||||
return cls | |||||
class BaseMapping(): | |||||
class BaseMapping(metaclass=abc.ABCMeta): | |||||
"""Base class for mappings to inherit from | """Base class for mappings to inherit from | ||||
To implement a new mapping: | To implement a new mapping: | ||||
- inherit this class | - inherit this class | ||||
- add a local property self.mapping | |||||
- override translate function | - override translate function | ||||
""" | """ | ||||
def __init__(self): | |||||
self.log = logging.getLogger('%s.%s' % ( | |||||
self.__class__.__module__, | |||||
self.__class__.__name__)) | |||||
@abc.abstractmethod | |||||
def detect_metadata_files(self, files): | |||||
moranegg: This is a really nice way to do it | |||||
""" | |||||
Detects files potentially containing metadata | |||||
Args: | |||||
- file_entries (list): list of files | |||||
Returns: | |||||
- empty list if nothing was found | |||||
- list of sha1 otherwise | |||||
""" | |||||
pass | |||||
def translate(self, content_dict): | @abc.abstractmethod | ||||
def translate(self, file_content): | |||||
pass | |||||
class DictMapping(BaseMapping): | |||||
"""Base class for mappings that take as input a file that is mostly | |||||
a key-value store (eg. a shallow JSON dict).""" | |||||
@property | |||||
@abc.abstractmethod | |||||
def mapping(self): | |||||
"""A translation dict to map dict keys into a canonical name.""" | |||||
pass | |||||
def translate_dict(self, content_dict): | |||||
""" | """ | ||||
Tranlsates content by parsing content to a json object | Translates content by parsing content from a dict object | ||||
and translating with the npm mapping (for now hard_coded mapping) | and translating with the appropriate mapping | ||||
Args: | Args: | ||||
context_text (text): should be json | content_dict (dict) | ||||
Returns: | Returns: | ||||
dict: translated metadata in jsonb form needed for the indexer | dict: translated metadata in json-friendly form needed for | ||||
the indexer | |||||
""" | """ | ||||
translated_metadata = {} | translated_metadata = {} | ||||
default = 'other' | default = 'other' | ||||
translated_metadata['other'] = {} | translated_metadata['other'] = {} | ||||
try: | try: | ||||
for k, v in content_dict.items(): | for k, v in content_dict.items(): | ||||
try: | try: | ||||
Show All 11 Lines | def translate_dict(self, content_dict): | ||||
if isinstance(translated_metadata[term], dict): | if isinstance(translated_metadata[term], dict): | ||||
translated_metadata[term][k] = v | translated_metadata[term][k] = v | ||||
continue | continue | ||||
except KeyError: | except KeyError: | ||||
self.log.exception( | self.log.exception( | ||||
"Problem during item mapping") | "Problem during item mapping") | ||||
continue | continue | ||||
except Exception: | except Exception: | ||||
raise | |||||
return None | return None | ||||
return translated_metadata | return translated_metadata | ||||
class NpmMapping(BaseMapping): | class JsonMapping(DictMapping): | ||||
""" | """Base class for all mappings that use a JSON file as input.""" | ||||
dedicated class for NPM (package.json) mapping and translation | |||||
""" | |||||
mapping = { | |||||
'repository': 'codeRepository', | |||||
'os': 'operatingSystem', | |||||
'cpu': 'processorRequirements', | |||||
'engines': 'processorRequirements', | |||||
'dependencies': 'softwareRequirements', | |||||
'bundleDependencies': 'softwareRequirements', | |||||
'peerDependencies': 'softwareRequirements', | |||||
'author': 'author', | |||||
'contributor': 'contributor', | |||||
'keywords': 'keywords', | |||||
'license': 'license', | |||||
'version': 'version', | |||||
'description': 'description', | |||||
'name': 'name', | |||||
'devDependencies': 'softwareSuggestions', | |||||
'optionalDependencies': 'softwareSuggestions', | |||||
'bugs': 'issueTracker', | |||||
'homepage': 'url' | |||||
} | |||||
def translate(self, raw_content): | @property | ||||
content_dict = convert(raw_content) | @abc.abstractmethod | ||||
return super().translate(content_dict) | def filename(self): | ||||
"""The .json file to extract metadata from.""" | |||||
pass | |||||
class MavenMapping(BaseMapping): | |||||
""" | def detect_metadata_files(self, file_entries): | ||||
dedicated class for Maven (pom.xml) mapping and translation | for entry in file_entries: | ||||
""" | if entry['name'] == self.filename: | ||||
mapping = { | return [entry['sha1']] | ||||
'license': 'license', | return [] | ||||
'version': 'version', | |||||
'description': 'description', | |||||
'name': 'name', | |||||
'prerequisites': 'softwareRequirements', | |||||
'repositories': 'codeRepository', | |||||
'groupId': 'identifier', | |||||
'ciManagement': 'contIntegration', | |||||
'issuesManagement': 'issueTracker', | |||||
} | |||||
def translate(self, raw_content): | |||||
content = convert(raw_content) | |||||
# parse content from xml to dict | |||||
return super().translate(content) | |||||
class DoapMapping(BaseMapping): | |||||
mapping = { | |||||
} | |||||
def translate(self, raw_content): | def translate(self, raw_content): | ||||
content = convert(raw_content) | |||||
# parse content from xml to dict | |||||
return super().translate(content) | |||||
def parse_xml(content): | |||||
""" | """ | ||||
Parses content from xml to a python dict | Translates content by parsing content from a bytestring containing | ||||
json data and translating with the appropriate mapping | |||||
Args: | Args: | ||||
- content (text): the string form of the raw_content ( in xml) | raw_content: bytes | ||||
Returns: | Returns: | ||||
- parsed_xml (dict): a python dict of the content after parsing | dict: translated metadata in json-friendly form needed for | ||||
""" | the indexer | ||||
# check if xml | |||||
# use xml parser to dict | |||||
return content | |||||
""" | |||||
mapping_tool_fn = { | try: | ||||
"npm": NpmMapping(), | raw_content = raw_content.decode() | ||||
"maven": MavenMapping(), | except UnicodeDecodeError: | ||||
"doap_xml": DoapMapping() | self.log.warning('Error unidecoding %r', raw_content) | ||||
} | return | ||||
try: | |||||
content_dict = json.loads(raw_content) | |||||
except json.JSONDecodeError: | |||||
self.log.warning('Error unjsoning %r' % raw_content) | |||||
return | |||||
return self.translate_dict(content_dict) | |||||
def compute_metadata(context, raw_content): | @register_mapping | ||||
class NpmMapping(JsonMapping): | |||||
""" | """ | ||||
first landing method: a dispatcher that sends content | dedicated class for NPM (package.json) mapping and translation | ||||
to the right function to carry out the real parsing of syntax | """ | ||||
and translation of terms | mapping = CROSSWALK_TABLE['NodeJS'] | ||||
filename = b'package.json' | |||||
Args: | |||||
context (text): defines to which function/tool the content is sent | |||||
content (text): the string form of the raw_content | |||||
Returns: | |||||
dict: translated metadata jsonb dictionary needed for the indexer to | |||||
store in storage | |||||
@register_mapping | |||||
class CodemetaMapping(JsonMapping): | |||||
""" | """ | ||||
if raw_content is None or raw_content is b"": | dedicated class for CodeMeta (codemeta.json) mapping and translation | ||||
return None | """ | ||||
mapping = CROSSWALK_TABLE['codemeta-V1'] | |||||
# TODO: keep mapping not in code (maybe fetch crosswalk from storage?) | filename = b'codemeta.json' | ||||
# if fetched from storage should be done once for batch of sha1s | |||||
dictionary = mapping_tool_fn[context] | |||||
translated_metadata = dictionary.translate(raw_content) | |||||
return translated_metadata | |||||
def main(): | def main(): | ||||
raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | ||||
raw_content1 = b"""{"name": "test_name", | raw_content1 = b"""{"name": "test_name", | ||||
"unknown_term": "ut", | "unknown_term": "ut", | ||||
"prerequisites" :"packageXYZ"}""" | "prerequisites" :"packageXYZ"}""" | ||||
result = compute_metadata("npm", raw_content) | result = MAPPINGS["NpmMapping"].translate(raw_content) | ||||
result1 = compute_metadata("maven", raw_content1) | result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | ||||
print(result) | print(result) | ||||
print(result1) | print(result1) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |
This is a really nice way to do it