Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary.py
Show First 20 Lines • Show All 121 Lines • ▼ Show 20 Lines | def detect_metadata_files(cls, file_entries): | ||||
return [entry['sha1']] | return [entry['sha1']] | ||||
return [] | return [] | ||||
class DictMapping(BaseMapping): | class DictMapping(BaseMapping): | ||||
"""Base class for mappings that take as input a file that is mostly | """Base class for mappings that take as input a file that is mostly | ||||
a key-value store (eg. a shallow JSON dict).""" | a key-value store (eg. a shallow JSON dict).""" | ||||
string_fields = [] | |||||
'''List of fields that are simple strings, and don't need any | |||||
normalization.''' | |||||
@property | @property | ||||
@abc.abstractmethod | @abc.abstractmethod | ||||
def mapping(self): | def mapping(self): | ||||
"""A translation dict to map dict keys into a canonical name.""" | """A translation dict to map dict keys into a canonical name.""" | ||||
pass | pass | ||||
def _translate_dict(self, content_dict, *, normalize=True): | def _translate_dict(self, content_dict, *, normalize=True): | ||||
""" | """ | ||||
Show All 21 Lines | def _translate_dict(self, content_dict, *, normalize=True): | ||||
# crosswalk table | # crosswalk table | ||||
codemeta_key = self.mapping[k] | codemeta_key = self.mapping[k] | ||||
# if there is a normalization method, use it on the value | # if there is a normalization method, use it on the value | ||||
normalization_method = getattr( | normalization_method = getattr( | ||||
self, 'normalize_' + k.replace('-', '_'), None) | self, 'normalize_' + k.replace('-', '_'), None) | ||||
if normalization_method: | if normalization_method: | ||||
v = normalization_method(v) | v = normalization_method(v) | ||||
elif k in self.string_fields and isinstance(v, str): | |||||
pass | |||||
elif k in self.string_fields and isinstance(v, list): | |||||
v = [x for x in v if isinstance(x, str)] | |||||
else: | |||||
continue | |||||
# set the translation metadata with the normalized value | # set the translation metadata with the normalized value | ||||
if codemeta_key in translated_metadata: | if codemeta_key in translated_metadata: | ||||
translated_metadata[codemeta_key] = merge_values( | translated_metadata[codemeta_key] = merge_values( | ||||
translated_metadata[codemeta_key], v) | translated_metadata[codemeta_key], v) | ||||
else: | else: | ||||
translated_metadata[codemeta_key] = v | translated_metadata[codemeta_key] = v | ||||
if normalize: | if normalize: | ||||
Show All 35 Lines | |||||
@register_mapping | @register_mapping | ||||
class NpmMapping(JsonMapping): | class NpmMapping(JsonMapping): | ||||
""" | """ | ||||
dedicated class for NPM (package.json) mapping and translation | dedicated class for NPM (package.json) mapping and translation | ||||
""" | """ | ||||
name = 'npm' | name = 'npm' | ||||
mapping = CROSSWALK_TABLE['NodeJS'] | mapping = CROSSWALK_TABLE['NodeJS'] | ||||
filename = b'package.json' | filename = b'package.json' | ||||
string_fields = ['name', 'version', 'homepage', 'description', 'email'] | |||||
_schema_shortcuts = { | _schema_shortcuts = { | ||||
'github': 'git+https://github.com/%s.git', | 'github': 'git+https://github.com/%s.git', | ||||
'gist': 'git+https://gist.github.com/%s.git', | 'gist': 'git+https://gist.github.com/%s.git', | ||||
'gitlab': 'git+https://gitlab.com/%s.git', | 'gitlab': 'git+https://gitlab.com/%s.git', | ||||
# Bitbucket supports both hg and git, and the shortcut does not | # Bitbucket supports both hg and git, and the shortcut does not | ||||
# tell which one to use. | # tell which one to use. | ||||
# 'bitbucket': 'https://bitbucket.org/', | # 'bitbucket': 'https://bitbucket.org/', | ||||
Show All 9 Lines | def normalize_repository(self, d): | ||||
{'@id': 'git+https://example.org/foo.git'} | {'@id': 'git+https://example.org/foo.git'} | ||||
>>> NpmMapping().normalize_repository( | >>> NpmMapping().normalize_repository( | ||||
... 'gitlab:foo/bar') | ... 'gitlab:foo/bar') | ||||
{'@id': 'git+https://gitlab.com/foo/bar.git'} | {'@id': 'git+https://gitlab.com/foo/bar.git'} | ||||
>>> NpmMapping().normalize_repository( | >>> NpmMapping().normalize_repository( | ||||
... 'foo/bar') | ... 'foo/bar') | ||||
{'@id': 'git+https://github.com/foo/bar.git'} | {'@id': 'git+https://github.com/foo/bar.git'} | ||||
""" | """ | ||||
if isinstance(d, dict) and {'type', 'url'} <= set(d): | if isinstance(d, dict) and isinstance(d.get('type'), str) \ | ||||
and isinstance(d.get('url'), str): | |||||
url = '{type}+{url}'.format(**d) | url = '{type}+{url}'.format(**d) | ||||
elif isinstance(d, str): | elif isinstance(d, str): | ||||
if '://' in d: | if '://' in d: | ||||
url = d | url = d | ||||
elif ':' in d: | elif ':' in d: | ||||
(schema, rest) = d.split(':', 1) | (schema, rest) = d.split(':', 1) | ||||
if schema in self._schema_shortcuts: | if schema in self._schema_shortcuts: | ||||
url = self._schema_shortcuts[schema] % rest | url = self._schema_shortcuts[schema] % rest | ||||
Show All 14 Lines | def normalize_bugs(self, d): | ||||
... 'url': 'https://example.org/bugs/', | ... 'url': 'https://example.org/bugs/', | ||||
... 'email': 'bugs@example.org' | ... 'email': 'bugs@example.org' | ||||
... }) | ... }) | ||||
{'@id': 'https://example.org/bugs/'} | {'@id': 'https://example.org/bugs/'} | ||||
>>> NpmMapping().normalize_bugs( | >>> NpmMapping().normalize_bugs( | ||||
... 'https://example.org/bugs/') | ... 'https://example.org/bugs/') | ||||
{'@id': 'https://example.org/bugs/'} | {'@id': 'https://example.org/bugs/'} | ||||
""" | """ | ||||
if isinstance(d, dict) and 'url' in d: | if isinstance(d, dict) and isinstance(d.get('url'), str): | ||||
return {'@id': '{url}'.format(**d)} | return {'@id': d['url']} | ||||
elif isinstance(d, str): | elif isinstance(d, str): | ||||
return {'@id': d} | return {'@id': d} | ||||
else: | else: | ||||
return None | return None | ||||
_parse_author = re.compile(r'^ *' | _parse_author = re.compile(r'^ *' | ||||
r'(?P<name>.*?)' | r'(?P<name>.*?)' | ||||
r'( +<(?P<email>.*)>)?' | r'( +<(?P<email>.*)>)?' | ||||
Show All 28 Lines | def normalize_author(self, d): | ||||
url = d.get('url', None) | url = d.get('url', None) | ||||
elif isinstance(d, str): | elif isinstance(d, str): | ||||
match = self._parse_author.match(d) | match = self._parse_author.match(d) | ||||
name = match.group('name') | name = match.group('name') | ||||
email = match.group('email') | email = match.group('email') | ||||
url = match.group('url') | url = match.group('url') | ||||
else: | else: | ||||
return None | return None | ||||
if name: | if name and isinstance(name, str): | ||||
author[SCHEMA_URI+'name'] = name | author[SCHEMA_URI+'name'] = name | ||||
if email: | if email and isinstance(email, str): | ||||
author[SCHEMA_URI+'email'] = email | author[SCHEMA_URI+'email'] = email | ||||
if url: | if url and isinstance(url, str): | ||||
author[SCHEMA_URI+'url'] = {'@id': url} | author[SCHEMA_URI+'url'] = {'@id': url} | ||||
return {"@list": [author]} | return {"@list": [author]} | ||||
def normalize_license(self, s): | def normalize_license(self, s): | ||||
"""https://docs.npmjs.com/files/package.json#license | """https://docs.npmjs.com/files/package.json#license | ||||
>>> NpmMapping().normalize_license('MIT') | >>> NpmMapping().normalize_license('MIT') | ||||
{'@id': 'https://spdx.org/licenses/MIT'} | {'@id': 'https://spdx.org/licenses/MIT'} | ||||
""" | """ | ||||
if isinstance(s, str): | if isinstance(s, str): | ||||
return {"@id": "https://spdx.org/licenses/" + s} | return {"@id": "https://spdx.org/licenses/" + s} | ||||
else: | else: | ||||
return None | return None | ||||
def normalize_homepage(self, s): | def normalize_homepage(self, s): | ||||
"""https://docs.npmjs.com/files/package.json#homepage | """https://docs.npmjs.com/files/package.json#homepage | ||||
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe') | >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') | ||||
{'@id': 'https://example.org/~john.doe'} | {'@id': 'https://example.org/~john.doe'} | ||||
""" | """ | ||||
if isinstance(s, str): | if isinstance(s, str): | ||||
return {"@id": s} | return {"@id": s} | ||||
def normalize_keywords(self, l): | |||||
"""https://docs.npmjs.com/files/package.json#homepage | |||||
>>> NpmMapping().normalize_keywords(['foo', 'bar']) | |||||
['foo', 'bar'] | |||||
""" | |||||
if isinstance(l, list): | |||||
return [x for x in l if isinstance(x, str)] | |||||
@register_mapping | @register_mapping | ||||
class CodemetaMapping(SingleFileMapping): | class CodemetaMapping(SingleFileMapping): | ||||
""" | """ | ||||
dedicated class for CodeMeta (codemeta.json) mapping and translation | dedicated class for CodeMeta (codemeta.json) mapping and translation | ||||
""" | """ | ||||
name = 'codemeta' | name = 'codemeta' | ||||
filename = b'codemeta.json' | filename = b'codemeta.json' | ||||
string_fields = ['name', 'version', 'url', 'description', 'email'] | |||||
def translate(self, content): | def translate(self, content): | ||||
return self.normalize_translation(expand(json.loads(content.decode()))) | try: | ||||
return self.normalize_translation(expand( | |||||
json.loads(content.decode()))) | |||||
except Exception: | |||||
return None | |||||
@register_mapping | @register_mapping | ||||
class MavenMapping(DictMapping, SingleFileMapping): | class MavenMapping(DictMapping, SingleFileMapping): | ||||
""" | """ | ||||
dedicated class for Maven (pom.xml) mapping and translation | dedicated class for Maven (pom.xml) mapping and translation | ||||
""" | """ | ||||
name = 'maven' | name = 'maven' | ||||
filename = b'pom.xml' | filename = b'pom.xml' | ||||
mapping = CROSSWALK_TABLE['Java (Maven)'] | mapping = CROSSWALK_TABLE['Java (Maven)'] | ||||
string_fields = ['name', 'version', 'description', 'email'] | |||||
def translate(self, content): | def translate(self, content): | ||||
try: | try: | ||||
d = xmltodict.parse(content).get('project') or {} | d = xmltodict.parse(content).get('project') or {} | ||||
except xml.parsers.expat.ExpatError: | except xml.parsers.expat.ExpatError: | ||||
self.log.warning('Error parsing XML from %s', self.log_suffix) | self.log.warning('Error parsing XML from %s', self.log_suffix) | ||||
return None | return None | ||||
except UnicodeDecodeError: | except UnicodeDecodeError: | ||||
Show All 26 Lines | def parse_repositories(self, d): | ||||
... </repository> | ... </repository> | ||||
... </repositories> | ... </repositories> | ||||
... ''') | ... ''') | ||||
>>> MavenMapping().parse_repositories(d) | >>> MavenMapping().parse_repositories(d) | ||||
""" | """ | ||||
repositories = d.get('repositories') | repositories = d.get('repositories') | ||||
if not repositories: | if not repositories: | ||||
results = [self.parse_repository(d, self._default_repository)] | results = [self.parse_repository(d, self._default_repository)] | ||||
else: | elif isinstance(repositories, dict): | ||||
repositories = repositories.get('repository') or [] | repositories = repositories.get('repository') or [] | ||||
if not isinstance(repositories, list): | if not isinstance(repositories, list): | ||||
repositories = [repositories] | repositories = [repositories] | ||||
results = [self.parse_repository(d, repo) | results = [self.parse_repository(d, repo) | ||||
for repo in repositories] | for repo in repositories] | ||||
else: | |||||
results = [] | |||||
return [res for res in results if res] or None | return [res for res in results if res] or None | ||||
def parse_repository(self, d, repo): | def parse_repository(self, d, repo): | ||||
if not isinstance(repo, dict): | |||||
return | |||||
if repo.get('layout', 'default') != 'default': | if repo.get('layout', 'default') != 'default': | ||||
return # TODO ? | return # TODO ? | ||||
url = repo.get('url') | url = repo.get('url') | ||||
group_id = d.get('groupId') | group_id = d.get('groupId') | ||||
artifact_id = d.get('artifactId') | artifact_id = d.get('artifactId') | ||||
if (isinstance(url, str) and isinstance(group_id, str) | if (isinstance(url, str) and isinstance(group_id, str) | ||||
and isinstance(artifact_id, str)): | and isinstance(artifact_id, str)): | ||||
repo = os.path.join(url, *group_id.split('.'), artifact_id) | repo = os.path.join(url, *group_id.split('.'), artifact_id) | ||||
return {"@id": repo} | return {"@id": repo} | ||||
def normalize_groupId(self, id_): | def normalize_groupId(self, id_): | ||||
"""https://maven.apache.org/pom.html#Maven_Coordinates | """https://maven.apache.org/pom.html#Maven_Coordinates | ||||
>>> MavenMapping().normalize_groupId('org.example') | >>> MavenMapping().normalize_groupId('org.example') | ||||
{'@id': 'org.example'} | {'@id': 'org.example'} | ||||
""" | """ | ||||
if isinstance(id_, str): | |||||
return {"@id": id_} | return {"@id": id_} | ||||
def parse_licenses(self, d): | def parse_licenses(self, d): | ||||
"""https://maven.apache.org/pom.html#Licenses | """https://maven.apache.org/pom.html#Licenses | ||||
>>> import xmltodict | >>> import xmltodict | ||||
>>> import json | >>> import json | ||||
>>> d = xmltodict.parse(''' | >>> d = xmltodict.parse(''' | ||||
... <licenses> | ... <licenses> | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | def parse_licenses(self, d): | ||||
return | return | ||||
licenses = licenses.get('license') | licenses = licenses.get('license') | ||||
if isinstance(licenses, dict): | if isinstance(licenses, dict): | ||||
licenses = [licenses] | licenses = [licenses] | ||||
elif not isinstance(licenses, list): | elif not isinstance(licenses, list): | ||||
return | return | ||||
return [{"@id": license['url']} | return [{"@id": license['url']} | ||||
for license in licenses | for license in licenses | ||||
if isinstance(license, dict) and 'url' in license] or None | if isinstance(license, dict) | ||||
and isinstance(license.get('url'), str)] or None | |||||
_normalize_pkginfo_key = str.lower | _normalize_pkginfo_key = str.lower | ||||
class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): | class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): | ||||
def header_fetch_parse(self, name, value): | def header_fetch_parse(self, name, value): | ||||
if hasattr(value, 'name'): | if hasattr(value, 'name'): | ||||
return value | return value | ||||
value = value.replace('\n ', '\n') | value = value.replace('\n ', '\n') | ||||
return self.header_factory(name, value) | return self.header_factory(name, value) | ||||
@register_mapping | @register_mapping | ||||
class PythonPkginfoMapping(DictMapping, SingleFileMapping): | class PythonPkginfoMapping(DictMapping, SingleFileMapping): | ||||
"""Dedicated class for Python's PKG-INFO mapping and translation. | """Dedicated class for Python's PKG-INFO mapping and translation. | ||||
https://www.python.org/dev/peps/pep-0314/""" | https://www.python.org/dev/peps/pep-0314/""" | ||||
name = 'pkg-info' | name = 'pkg-info' | ||||
filename = b'PKG-INFO' | filename = b'PKG-INFO' | ||||
mapping = {_normalize_pkginfo_key(k): v | mapping = {_normalize_pkginfo_key(k): v | ||||
for (k, v) in CROSSWALK_TABLE['Python PKG-INFO'].items()} | for (k, v) in CROSSWALK_TABLE['Python PKG-INFO'].items()} | ||||
string_fields = ['name', 'version', 'description', 'summary', | |||||
'author', 'author-email'] | |||||
_parser = email.parser.BytesHeaderParser( | _parser = email.parser.BytesHeaderParser( | ||||
policy=LinebreakPreservingEmailPolicy()) | policy=LinebreakPreservingEmailPolicy()) | ||||
def translate(self, content): | def translate(self, content): | ||||
msg = self._parser.parsebytes(content) | msg = self._parser.parsebytes(content) | ||||
d = {} | d = {} | ||||
for (key, value) in msg.items(): | for (key, value) in msg.items(): | ||||
Show All 19 Lines | class PythonPkginfoMapping(DictMapping, SingleFileMapping): | ||||
def normalize_license(self, licenses): | def normalize_license(self, licenses): | ||||
return [{'@id': license} for license in licenses] | return [{'@id': license} for license in licenses] | ||||
@register_mapping | @register_mapping | ||||
class GemspecMapping(DictMapping): | class GemspecMapping(DictMapping): | ||||
name = 'gemspec' | name = 'gemspec' | ||||
mapping = CROSSWALK_TABLE['Ruby Gem'] | mapping = CROSSWALK_TABLE['Ruby Gem'] | ||||
string_fields = ['name', 'version', 'description', 'summary', 'email'] | |||||
_re_spec_new = re.compile(r'.*Gem::Specification.new +(do|\{) +\|.*\|.*') | _re_spec_new = re.compile(r'.*Gem::Specification.new +(do|\{) +\|.*\|.*') | ||||
_re_spec_entry = re.compile(r'\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)') | _re_spec_entry = re.compile(r'\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)') | ||||
@classmethod | @classmethod | ||||
def detect_metadata_files(cls, file_entries): | def detect_metadata_files(cls, file_entries): | ||||
for entry in file_entries: | for entry in file_entries: | ||||
if entry['name'].endswith(b'.gemspec'): | if entry['name'].endswith(b'.gemspec'): | ||||
▲ Show 20 Lines • Show All 107 Lines • Show Last 20 Lines |