Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
Show All 22 Lines | |||||
MAPPINGS = {} | MAPPINGS = {} | ||||
def register_mapping(cls): | def register_mapping(cls): | ||||
MAPPINGS[cls.__name__] = cls() | MAPPINGS[cls.__name__] = cls() | ||||
return cls | return cls | ||||
def merge_values(v1, v2): | |||||
"""If v1 and v2 are of the form `{"@list": l1}` and `{"@list": l2}`, | |||||
returns `{"@list": l1 + l2}`. | |||||
Otherwise, make them lists (if they are not already) and concatenate | |||||
them. | |||||
>>> merge_values('a', 'b') | |||||
['a', 'b'] | |||||
>>> merge_values(['a', 'b'], 'c') | |||||
['a', 'b', 'c'] | |||||
>>> merge_values({'@list': ['a', 'b']}, {'@list': ['c']}) | |||||
{'@list': ['a', 'b', 'c']} | |||||
""" | |||||
if isinstance(v1, dict) and set(v1) == {'@list'}: | |||||
assert isinstance(v1['@list'], list) | |||||
if isinstance(v2, dict) and set(v2) == {'@list'}: | |||||
assert isinstance(v2['@list'], list) | |||||
return {'@list': v1['@list'] + v2['@list']} | |||||
else: | |||||
raise ValueError('Cannot merge %r and %r' % (v1, v2)) | |||||
else: | |||||
if isinstance(v2, dict) and '@list' in v2: | |||||
raise ValueError('Cannot merge %r and %r' % (v1, v2)) | |||||
if not isinstance(v1, list): | |||||
v1 = [v1] | |||||
if not isinstance(v2, list): | |||||
v2 = [v2] | |||||
return v1 + v2 | |||||
class BaseMapping(metaclass=abc.ABCMeta): | class BaseMapping(metaclass=abc.ABCMeta): | ||||
"""Base class for mappings to inherit from | """Base class for mappings to inherit from | ||||
To implement a new mapping: | To implement a new mapping: | ||||
- inherit this class | - inherit this class | ||||
- override translate function | - override translate function | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def translate_dict(self, content_dict, *, normalize=True): | ||||
# method for this key | # method for this key | ||||
translation_method = getattr( | translation_method = getattr( | ||||
self, 'translate_' + k.replace('-', '_'), None) | self, 'translate_' + k.replace('-', '_'), None) | ||||
if translation_method: | if translation_method: | ||||
translation_method(translated_metadata, v) | translation_method(translated_metadata, v) | ||||
elif k in self.mapping: | elif k in self.mapping: | ||||
# if there is no method, but the key is known from the | # if there is no method, but the key is known from the | ||||
# crosswalk table | # crosswalk table | ||||
codemeta_key = self.mapping[k] | |||||
# if there is a normalization method, use it on the value | # if there is a normalization method, use it on the value | ||||
normalization_method = getattr( | normalization_method = getattr( | ||||
self, 'normalize_' + k.replace('-', '_'), None) | self, 'normalize_' + k.replace('-', '_'), None) | ||||
if normalization_method: | if normalization_method: | ||||
v = normalization_method(v) | v = normalization_method(v) | ||||
# set the translation metadata with the normalized value | # set the translation metadata with the normalized value | ||||
translated_metadata[self.mapping[k]] = v | if codemeta_key in translated_metadata: | ||||
translated_metadata[codemeta_key] = merge_values( | |||||
translated_metadata[codemeta_key], v) | |||||
else: | |||||
translated_metadata[codemeta_key] = v | |||||
if normalize: | if normalize: | ||||
return self.normalize_translation(translated_metadata) | return self.normalize_translation(translated_metadata) | ||||
else: | else: | ||||
return translated_metadata | return translated_metadata | ||||
douardda: Why a method? self is unused, so a bare function is fine here.
Which then allows you to very… | |||||
class JsonMapping(DictMapping, SingleFileMapping): | class JsonMapping(DictMapping, SingleFileMapping): | ||||
"""Base class for all mappings that use a JSON file as input.""" | """Base class for all mappings that use a JSON file as input.""" | ||||
def translate(self, raw_content): | def translate(self, raw_content): | ||||
""" | """ | ||||
Translates content by parsing content from a bytestring containing | Translates content by parsing content from a bytestring containing | ||||
json data and translating with the appropriate mapping | json data and translating with the appropriate mapping | ||||
▲ Show 20 Lines • Show All 326 Lines • ▼ Show 20 Lines | def translate(self, content): | ||||
SCHEMA_URI+'name': | SCHEMA_URI+'name': | ||||
metadata.pop(SCHEMA_URI+'author', [None])[0], | metadata.pop(SCHEMA_URI+'author', [None])[0], | ||||
SCHEMA_URI+'email': | SCHEMA_URI+'email': | ||||
metadata.pop(SCHEMA_URI+'email', [None])[0], | metadata.pop(SCHEMA_URI+'email', [None])[0], | ||||
}] | }] | ||||
} | } | ||||
return self.normalize_translation(metadata) | return self.normalize_translation(metadata) | ||||
def translate_summary(self, translated_metadata, v): | |||||
k = self.mapping['summary'] | |||||
translated_metadata.setdefault(k, []).append(v) | |||||
def translate_description(self, translated_metadata, v): | |||||
k = self.mapping['description'] | |||||
translated_metadata.setdefault(k, []).append(v) | |||||
def normalize_home_page(self, urls): | def normalize_home_page(self, urls): | ||||
return [{'@id': url} for url in urls] | return [{'@id': url} for url in urls] | ||||
def normalize_license(self, licenses): | def normalize_license(self, licenses): | ||||
return [{'@id': license} for license in licenses] | return [{'@id': license} for license in licenses] | ||||
@register_mapping | @register_mapping | ||||
▲ Show 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | def normalize_license(self, s): | ||||
return [{"@id": "https://spdx.org/licenses/" + s}] | return [{"@id": "https://spdx.org/licenses/" + s}] | ||||
def normalize_licenses(self, licenses): | def normalize_licenses(self, licenses): | ||||
if isinstance(licenses, list): | if isinstance(licenses, list): | ||||
return [{"@id": "https://spdx.org/licenses/" + license} | return [{"@id": "https://spdx.org/licenses/" + license} | ||||
for license in licenses | for license in licenses | ||||
if isinstance(license, str)] | if isinstance(license, str)] | ||||
def translate_author(self, translated_metadata, v): | def normalize_author(self, author): | ||||
k = self.mapping['author'] | return {"@list": [author]} | ||||
translated_metadata.setdefault(k, {"@list": []})["@list"].append(v) | |||||
def normalize_authors(self, authors): | |||||
def translate_authors(self, translated_metadata, v): | return {"@list": authors} | ||||
k = self.mapping['authors'] | |||||
translated_metadata.setdefault(k, {"@list": []})["@list"].extend(v) | |||||
def translate_summary(self, translated_metadata, v): | |||||
k = self.mapping['summary'] | |||||
translated_metadata.setdefault(k, []).append(v) | |||||
def translate_description(self, translated_metadata, v): | |||||
k = self.mapping['description'] | |||||
translated_metadata.setdefault(k, []).append(v) | |||||
def main(): | def main(): | ||||
raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | raw_content = """{"name": "test_name", "unknown_term": "ut"}""" | ||||
raw_content1 = b"""{"name": "test_name", | raw_content1 = b"""{"name": "test_name", | ||||
"unknown_term": "ut", | "unknown_term": "ut", | ||||
"prerequisites" :"packageXYZ"}""" | "prerequisites" :"packageXYZ"}""" | ||||
result = MAPPINGS["NpmMapping"].translate(raw_content) | result = MAPPINGS["NpmMapping"].translate(raw_content) | ||||
result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | result1 = MAPPINGS["MavenMapping"].translate(raw_content1) | ||||
print(result) | print(result) | ||||
print(result1) | print(result1) | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
main() | main() |
Why a method? self is unused, so a bare function is fine here.
Which then allows you to very easily write unit tests for 😄