Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346535
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
View Options
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
index 4169937..462d6b9 100644
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -1,184 +1,184 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
from typing import Any, Dict, List, Optional
from typing_extensions import TypedDict
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
from swh.indexer.storage.interface import Sha1
class DirectoryLsEntry(TypedDict):
target: Sha1
sha1: Sha1
name: bytes
type: str
class BaseMapping:
"""Base class for mappings to inherit from
To implement a new mapping:
- inherit this class
- override translate function
"""
def __init__(self, log_suffix=""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
def name(self):
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
"""
Returns the sha1 hashes of files which can be translated by this mapping
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
def translate(self, file_content: bytes) -> Optional[Dict]:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata)
class SingleFileMapping(BaseMapping):
"""Base class for all mappings that use a single file as input."""
@property
def filename(self):
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]:
for entry in file_entries:
if entry["name"].lower() == cls.filename:
return [entry["sha1"]]
return []
class DictMapping(BaseMapping):
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
string_fields = [] # type: List[str]
"""List of fields that are simple strings, and don't need any
normalization."""
@property
def mapping(self):
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls):
return {
term
for (key, term) in cls.mapping.items()
if key in cls.string_fields
or hasattr(cls, "translate_" + cls._normalize_method_name(key))
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
def _translate_dict(
self, content_dict: Dict, *, normalize: bool = True
) -> Dict[str, str]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
content_dict (dict): content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"}
for k, v in content_dict.items():
# First, check if there is a specific translation
# method for this key
translation_method = getattr(
self, "translate_" + self._normalize_method_name(k), None
)
if translation_method:
translation_method(translated_metadata, v)
elif k in self.mapping:
# if there is no method, but the key is known from the
# crosswalk table
codemeta_key = self.mapping[k]
# if there is a normalization method, use it on the value
normalization_method = getattr(
self, "normalize_" + self._normalize_method_name(k), None
)
if normalization_method:
v = normalization_method(v)
elif k in self.string_fields and isinstance(v, str):
pass
elif k in self.string_fields and isinstance(v, list):
v = [x for x in v if isinstance(x, str)]
else:
continue
# set the translation metadata with the normalized value
if codemeta_key in translated_metadata:
translated_metadata[codemeta_key] = merge_values(
translated_metadata[codemeta_key], v
)
else:
translated_metadata[codemeta_key] = v
if normalize:
return self.normalize_translation(translated_metadata)
else:
return translated_metadata
-class JsonMapping(DictMapping, SingleFileMapping):
- """Base class for all mappings that use a JSON file as input."""
+class JsonMapping(DictMapping):
+ """Base class for all mappings that use JSON data as input."""
def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
raw_content (bytes): raw content to translate
Returns:
dict: translated metadata in json-friendly form needed for
the indexer
"""
try:
raw_content_string: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
try:
content_dict = json.loads(raw_content_string)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
return None
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
index 467866d..2b3916a 100644
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -1,228 +1,228 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
-from .base import JsonMapping
+from .base import JsonMapping, SingleFileMapping
-class NpmMapping(JsonMapping):
+class NpmMapping(JsonMapping, SingleFileMapping):
"""
dedicated class for NPM (package.json) mapping and translation
"""
name = "npm"
mapping = CROSSWALK_TABLE["NodeJS"]
filename = b"package.json"
string_fields = ["name", "version", "homepage", "description", "email"]
_schema_shortcuts = {
"github": "git+https://github.com/%s.git",
"gist": "git+https://gist.github.com/%s.git",
"gitlab": "git+https://gitlab.com/%s.git",
# Bitbucket supports both hg and git, and the shortcut does not
# tell which one to use.
# 'bitbucket': 'https://bitbucket.org/',
}
def normalize_repository(self, d):
"""https://docs.npmjs.com/files/package.json#repository
>>> NpmMapping().normalize_repository({
... 'type': 'git',
... 'url': 'https://example.org/foo.git'
... })
{'@id': 'git+https://example.org/foo.git'}
>>> NpmMapping().normalize_repository(
... 'gitlab:foo/bar')
{'@id': 'git+https://gitlab.com/foo/bar.git'}
>>> NpmMapping().normalize_repository(
... 'foo/bar')
{'@id': 'git+https://github.com/foo/bar.git'}
"""
if (
isinstance(d, dict)
and isinstance(d.get("type"), str)
and isinstance(d.get("url"), str)
):
url = "{type}+{url}".format(**d)
elif isinstance(d, str):
if "://" in d:
url = d
elif ":" in d:
(schema, rest) = d.split(":", 1)
if schema in self._schema_shortcuts:
url = self._schema_shortcuts[schema] % rest
else:
return None
else:
url = self._schema_shortcuts["github"] % d
else:
return None
return {"@id": url}
def normalize_bugs(self, d):
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
... 'url': 'https://example.org/bugs/',
... 'email': 'bugs@example.org'
... })
{'@id': 'https://example.org/bugs/'}
>>> NpmMapping().normalize_bugs(
... 'https://example.org/bugs/')
{'@id': 'https://example.org/bugs/'}
"""
if isinstance(d, dict) and isinstance(d.get("url"), str):
return {"@id": d["url"]}
elif isinstance(d, str):
return {"@id": d}
else:
return None
_parse_author = re.compile(
r"^ *" r"(?P<name>.*?)" r"( +<(?P<email>.*)>)?" r"( +\((?P<url>.*)\))?" r" *$"
)
def normalize_author(self, d):
"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
>>> pprint(NpmMapping().normalize_author({
... 'name': 'John Doe',
... 'email': 'john.doe@example.org',
... 'url': 'https://example.org/~john.doe',
... }))
{'@list': [{'@type': 'http://schema.org/Person',
'http://schema.org/email': 'john.doe@example.org',
'http://schema.org/name': 'John Doe',
'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]}
>>> pprint(NpmMapping().normalize_author(
... 'John Doe <john.doe@example.org> (https://example.org/~john.doe)'
... ))
{'@list': [{'@type': 'http://schema.org/Person',
'http://schema.org/email': 'john.doe@example.org',
'http://schema.org/name': 'John Doe',
'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]}
""" # noqa
author = {"@type": SCHEMA_URI + "Person"}
if isinstance(d, dict):
name = d.get("name", None)
email = d.get("email", None)
url = d.get("url", None)
elif isinstance(d, str):
match = self._parse_author.match(d)
if not match:
return None
name = match.group("name")
email = match.group("email")
url = match.group("url")
else:
return None
if name and isinstance(name, str):
author[SCHEMA_URI + "name"] = name
if email and isinstance(email, str):
author[SCHEMA_URI + "email"] = email
if url and isinstance(url, str):
author[SCHEMA_URI + "url"] = {"@id": url}
return {"@list": [author]}
def normalize_description(self, description):
r"""Try to re-decode ``description`` as UTF-16, as this is a somewhat common
mistake that causes issues in the database because of null bytes in JSON.
>>> NpmMapping().normalize_description("foo bar")
'foo bar'
>>> NpmMapping().normalize_description(
... "\ufffd\ufffd#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 \x00"
... )
'foo bar'
>>> NpmMapping().normalize_description(
... "\ufffd\ufffd\x00#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 "
... )
'foo bar'
>>> NpmMapping().normalize_description(
... # invalid UTF-16 and meaningless UTF-8:
... "\ufffd\ufffd\x00#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00"
... ) is None
True
>>> NpmMapping().normalize_description(
... # ditto (ut looks like little-endian at first)
... "\ufffd\ufffd#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00\x00"
... ) is None
True
>>> NpmMapping().normalize_description(None) is None
True
"""
if not isinstance(description, str):
return None
# XXX: if this function ever need to support more cases, consider
# switching to https://pypi.org/project/ftfy/ instead of adding more hacks
if description.startswith("\ufffd\ufffd") and "\x00" in description:
# 2 unicode replacement characters followed by '# ' encoded as UTF-16
# is a common mistake, which indicates a README.md was saved as UTF-16,
# and some NPM tool opened it as UTF-8 and used the first line as
# description.
description_bytes = description.encode()
# Strip the the two unicode replacement characters
assert description_bytes.startswith(b"\xef\xbf\xbd\xef\xbf\xbd")
description_bytes = description_bytes[6:]
# If the following attempts fail to recover the description, discard it
# entirely because the current indexer storage backend (postgresql) cannot
# store zero bytes in JSON columns.
description = None
if not description_bytes.startswith(b"\x00"):
# try UTF-16 little-endian (the most common) first
try:
description = description_bytes.decode("utf-16le")
except UnicodeDecodeError:
pass
if description is None:
# if it fails, try UTF-16 big-endian
try:
description = description_bytes.decode("utf-16be")
except UnicodeDecodeError:
pass
if description:
if description.startswith("# "):
description = description[2:]
return description.rstrip()
return description
def normalize_license(self, s):
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
{'@id': 'https://spdx.org/licenses/MIT'}
"""
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
def normalize_homepage(self, s):
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe')
{'@id': 'https://example.org/~john.doe'}
"""
if isinstance(s, str):
return {"@id": s}
def normalize_keywords(self, lst):
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
['foo', 'bar']
"""
if isinstance(lst, list):
return [x for x in lst if isinstance(x, str)]
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 4:09 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3255697
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment