Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
# Copyright (C) 2017 The Software Heritage developers | # Copyright (C) 2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | |||||
import csv | import csv | ||||
import json | import json | ||||
import os.path | import os.path | ||||
import logging | |||||
import swh.indexer | import swh.indexer | ||||
CROSSWALK_TABLE_PATH = os.path.join(os.path.dirname(swh.indexer.__file__), | CROSSWALK_TABLE_PATH = os.path.join(os.path.dirname(swh.indexer.__file__), | ||||
'data', 'codemeta', 'crosswalk.csv') | 'data', 'codemeta', 'crosswalk.csv') | ||||
def read_crosstable(fd): | def read_crosstable(fd): | ||||
Show All 23 Lines | def read_crosstable(fd): | ||||
return codemeta_translation | return codemeta_translation | ||||
with open(CROSSWALK_TABLE_PATH) as fd: | with open(CROSSWALK_TABLE_PATH) as fd: | ||||
CROSSWALK_TABLE = read_crosstable(fd) | CROSSWALK_TABLE = read_crosstable(fd) | ||||
def convert(raw_content): | MAPPINGS = {} | ||||
""" | |||||
convert raw_content recursively: | |||||
- from bytes to string | |||||
- from string to dict | |||||
Args: | |||||
raw_content (bytes / string / dict) | |||||
Returns: | def register_mapping(cls): | ||||
dict: content (if string was json, otherwise returns string) | MAPPINGS[cls.__name__] = cls() | ||||
return cls | |||||
""" | |||||
if isinstance(raw_content, bytes): | |||||
return convert(raw_content.decode()) | |||||
if isinstance(raw_content, str): | |||||
try: | |||||
content = json.loads(raw_content) | |||||
if content: | |||||
return content | |||||
else: | |||||
return raw_content | |||||
except json.decoder.JSONDecodeError: | |||||
return raw_content | |||||
if isinstance(raw_content, dict): | |||||
return raw_content | |||||
class BaseMapping(): | class BaseMapping(metaclass=abc.ABCMeta): | ||||
"""Base class for mappings to inherit from | """Base class for mappings to inherit from | ||||
To implement a new mapping: | To implement a new mapping: | ||||
- inherit this class | - inherit this class | ||||
- add a local property self.mapping | |||||
- override translate function | - override translate function | ||||
""" | """ | ||||
def __init__(self): | |||||
self.log = logging.getLogger('%s.%s' % ( | |||||
self.__class__.__module__, | |||||
self.__class__.__name__)) | |||||
def translate(self, content_dict): | @abc.abstractmethod | ||||
moranegg: This is a really nice way to do it | |||||
def detect_metadata_files(self, files): | |||||
""" | """ | ||||
Tranlsates content by parsing content to a json object | Detects files potentially containing metadata | ||||
and translating with the npm mapping (for now hard_coded mapping) | Args: | ||||
- file_entries (list): list of files | |||||
Returns: | |||||
- empty list if nothing was found | |||||
- list of sha1 otherwise | |||||
""" | |||||
pass | |||||
@abc.abstractmethod | |||||
def translate(self, file_content): | |||||
pass | |||||
class DictMapping(BaseMapping): | |||||
"""Base class for mappings that take as input a file that is mostly | |||||
a key-value store (eg. a shallow JSON dict).""" | |||||
@property | |||||
@abc.abstractmethod | |||||
def mapping(self): | |||||
"""A translation dict to map dict keys into a canonical name.""" | |||||
pass | |||||
def translate_dict(self, content_dict): | |||||
""" | |||||
Translates content by parsing content from a dict object | |||||
and translating with the appropriate mapping | |||||
Args: | Args: | ||||
context_text (text): should be json | content_dict (dict) | ||||
Returns: | Returns: | ||||
dict: translated metadata in jsonb form needed for the indexer | dict: translated metadata in json-friendly form needed for | ||||
the indexer | |||||
""" | """ | ||||
translated_metadata = {} | translated_metadata = {} | ||||
default = 'other' | default = 'other' | ||||
translated_metadata['other'] = {} | translated_metadata['other'] = {} | ||||
try: | try: | ||||
for k, v in content_dict.items(): | for k, v in content_dict.items(): | ||||
try: | try: | ||||
Show All 11 Lines | def translate_dict(self, content_dict): | ||||
if isinstance(translated_metadata[term], dict): | if isinstance(translated_metadata[term], dict): | ||||
translated_metadata[term][k] = v | translated_metadata[term][k] = v | ||||
continue | continue | ||||
except KeyError: | except KeyError: | ||||
self.log.exception( | self.log.exception( | ||||
"Problem during item mapping") | "Problem during item mapping") | ||||
continue | continue | ||||
except Exception: | except Exception: | ||||
raise | |||||
return None | return None | ||||
return translated_metadata | return translated_metadata | ||||
class NpmMapping(BaseMapping): | class JsonMapping(DictMapping): | ||||
""" | """Base class for all mappings that use a JSON file as input.""" | ||||
dedicated class for NPM (package.json) mapping and translation | |||||
""" | |||||
mapping = CROSSWALK_TABLE['NodeJS'] | |||||
def translate(self, raw_content): | |||||
content_dict = convert(raw_content) | |||||
return super().translate(content_dict) | |||||
@property | |||||
class MavenMapping(BaseMapping): | @abc.abstractmethod | ||||
""" | def filename(self): | ||||
dedicated class for Maven (pom.xml) mapping and translation | """The .json file to extract metadata from.""" | ||||
""" | pass | ||||
mapping = CROSSWALK_TABLE['Java (Maven)'] | |||||
def detect_metadata_files(self, file_entries): | |||||
def translate(self, raw_content): | for entry in file_entries: | ||||
content = convert(raw_content) | if entry['name'] == self.filename: | ||||
# parse content from xml to dict | return [entry['sha1']] | ||||
return super().translate(content) | return [] | ||||
class DoapMapping(BaseMapping): | |||||
mapping = { | |||||
} | |||||
def translate(self, raw_content): | def translate(self, raw_content): | ||||
content = convert(raw_content) | |||||
# parse content from xml to dict | |||||
return super().translate(content) | |||||
def parse_xml(content): | |||||
""" | """ | ||||
Parses content from xml to a python dict | Translates content by parsing content from a bytestring containing | ||||
json data and translating with the appropriate mapping | |||||
Args: | Args: | ||||
- content (text): the string form of the raw_content ( in xml) | raw_content: bytes | ||||
Returns: | Returns: | ||||
- parsed_xml (dict): a python dict of the content after parsing | dict: translated metadata in json-friendly form needed for | ||||
""" | the indexer | ||||
# check if xml | |||||
# use xml parser to dict | |||||
return content | |||||
mapping_tool_fn = { | """ | ||||
"npm": NpmMapping(), | try: | ||||
"maven": MavenMapping(), | raw_content = raw_content.decode() | ||||
"doap_xml": DoapMapping() | except UnicodeDecodeError: | ||||
} | self.log.warning('Error unidecoding %r', raw_content) | ||||
return | |||||
try: | |||||
content_dict = json.loads(raw_content) | |||||
except json.JSONDecodeError: | |||||
self.log.warning('Error unjsoning %r' % raw_content) | |||||
return | |||||
return self.translate_dict(content_dict) | |||||
def compute_metadata(context, raw_content): | @register_mapping | ||||
class NpmMapping(JsonMapping): | |||||
""" | """ | ||||
first landing method: a dispatcher that sends content | dedicated class for NPM (package.json) mapping and translation | ||||
to the right function to carry out the real parsing of syntax | """ | ||||
and translation of terms | mapping = CROSSWALK_TABLE['NodeJS'] | ||||
filename = b'package.json' | |||||
Args: | |||||
context (text): defines to which function/tool the content is sent | |||||
content (text): the string form of the raw_content | |||||
Returns: | |||||
dict: translated metadata jsonb dictionary needed for the indexer to | |||||
store in storage | |||||
@register_mapping | |||||
class CodemetaMapping(JsonMapping): | |||||
""" | """ | ||||
if raw_content is None or raw_content is b"": | dedicated class for CodeMeta (codemeta.json) mapping and translation | ||||
return None | """ | ||||
mapping = CROSSWALK_TABLE['codemeta-V1'] | |||||
# TODO: keep mapping not in code (maybe fetch crosswalk from storage?) | filename = b'codemeta.json' | ||||
# if fetched from storage should be done once for batch of sha1s | |||||
dictionary = mapping_tool_fn[context] | |||||
translated_metadata = dictionary.translate(raw_content) | |||||
return translated_metadata | |||||
def main(): | def main(): | ||||
raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | ||||
raw_content1 = b"""{"name": "test_name", | raw_content1 = b"""{"name": "test_name", | ||||
"unknown_term": "ut", | "unknown_term": "ut", | ||||
"prerequisites" :"packageXYZ"}""" | "prerequisites" :"packageXYZ"}""" | ||||
result = compute_metadata("npm", raw_content) | result = MAPPINGS["NpmMapping"].translate(raw_content) | ||||
result1 = compute_metadata("maven", raw_content1) | result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | ||||
print(result) | print(result) | ||||
print(result1) | print(result1) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |
This is a really nice way to do it