Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066280
D7342.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
20 KB
Subscribers
None
D7342.diff
View Options
diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py
--- a/swh/indexer/metadata_dictionary/__init__.py
+++ b/swh/indexer/metadata_dictionary/__init__.py
@@ -1,10 +1,13 @@
import collections
+from typing import DefaultDict, Dict, Set, Type
import click
+from typing_extensions import Final
from . import cff, codemeta, maven, npm, python, ruby
+from .base import BaseMapping
-MAPPINGS = {
+MAPPINGS: Final[Dict[str, Type[BaseMapping]]] = {
"CodemetaMapping": codemeta.CodemetaMapping,
"MavenMapping": maven.MavenMapping,
"NpmMapping": npm.NpmMapping,
@@ -14,7 +17,7 @@
}
-def list_terms():
+def list_terms() -> DefaultDict[str, Set[Type[BaseMapping]]]:
"""Returns a dictionary with all supported CodeMeta terms as keys,
and the mappings that support each of them as values."""
d = collections.defaultdict(set)
@@ -27,7 +30,7 @@
@click.command()
@click.argument("mapping_name")
@click.argument("file_name")
-def main(mapping_name, file_name):
+def main(mapping_name: str, file_name: str) -> None:
from pprint import pprint
with open(file_name, "rb") as fd:
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -5,11 +5,25 @@
import json
import logging
-from typing import List
+from typing import Dict, Iterable, List, Optional
+
+from typing_extensions import TypedDict
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
+class FileEntry(TypedDict):
+ name: bytes
+ sha1: bytes
+ sha1_git: bytes
+ target: bytes
+ length: int
+ status: str
+ type: str
+ perms: int
+ dir_id: bytes
+
+
class BaseMapping:
"""Base class for mappings to inherit from
@@ -19,35 +33,40 @@
- override translate function
"""
- def __init__(self, log_suffix=""):
+ def __init__(self, log_suffix: str = ""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
- def name(self):
+ def name(self) -> str:
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
- def detect_metadata_files(cls, files):
+ def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]:
"""
Detects files potentially containing metadata
Args:
- file_entries (list): list of files
+ file_entries: list of files
Returns:
list: list of sha1 (possibly empty)
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
- def translate(self, file_content):
+ @classmethod
+ def supported_terms(cls) -> Iterable[str]:
+ """Returns all CodeMeta terms this mapping supports"""
+ raise NotImplementedError(f"{cls.__name__}.supported_terms")
+
+ def translate(self, file_content: bytes) -> Optional[Dict]:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
- def normalize_translation(self, metadata):
+ def normalize_translation(self, metadata: Dict) -> Dict:
return compact(metadata)
@@ -55,14 +74,14 @@
"""Base class for all mappings that use a single file as input."""
@property
- def filename(self):
+ def filename(self) -> bytes:
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]:
for entry in file_entries:
- if entry["name"].lower() == cls.filename.lower():
+ if entry["name"].lower() == cls.filename.lower(): # type: ignore
return [entry["sha1"]]
return []
@@ -71,36 +90,36 @@
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
- string_fields = [] # type: List[str]
+ string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
@property
- def mapping(self):
+ def mapping(self) -> Dict[str, str]:
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
- def _normalize_method_name(name):
+ def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> Iterable[str]:
return {
term
- for (key, term) in cls.mapping.items()
+ for (key, term) in cls.mapping.items() # type: ignore
if key in cls.string_fields
or hasattr(cls, "translate_" + cls._normalize_method_name(key))
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
- def _translate_dict(self, content_dict, *, normalize=True):
+ def _translate_dict(self, content_dict: Dict, *, normalize: bool = True) -> Dict:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
- content_dict (dict): content dict to translate
+ content_dict: content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
@@ -150,13 +169,13 @@
class JsonMapping(DictMapping, SingleFileMapping):
"""Base class for all mappings that use a JSON file as input."""
- def translate(self, raw_content):
+ def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
- raw_content (bytes): raw content to translate
+ raw_content: raw content to translate
Returns:
dict: translated metadata in json-friendly form needed for
@@ -164,14 +183,15 @@
"""
try:
- raw_content = raw_content.decode()
+ content: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
try:
- content_dict = json.loads(raw_content)
+ content_dict = json.loads(content)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
- return
+ return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
+ return None
diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py
--- a/swh/indexer/metadata_dictionary/cff.py
+++ b/swh/indexer/metadata_dictionary/cff.py
@@ -1,3 +1,5 @@
+from typing import Any, Dict, List, Optional
+
import yaml
from swh.indexer.codemeta import CODEMETA_CONTEXT_URL, CROSSWALK_TABLE, SCHEMA_URI
@@ -18,19 +20,19 @@
mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"]
string_fields = ["keywords", "license", "abstract", "version", "doi"]
- def translate(self, raw_content):
- raw_content = raw_content.decode()
- content_dict = yaml.load(raw_content, Loader=yaml.SafeLoader)
+ def translate(self, raw_content: bytes) -> Dict:
+ content: str = raw_content.decode()
+ content_dict = yaml.load(content, Loader=yaml.SafeLoader)
metadata = self._translate_dict(content_dict)
metadata["@context"] = CODEMETA_CONTEXT_URL
return metadata
- def normalize_authors(self, d):
- result = []
+ def normalize_authors(self, d) -> Dict[str, Any]:
+ result: List[Dict[str, Any]] = []
for author in d:
- author_data = {"@type": SCHEMA_URI + "Person"}
+ author_data: Dict[str, Any] = {"@type": SCHEMA_URI + "Person"}
if "orcid" in author:
author_data["@id"] = author["orcid"]
if "affiliation" in author:
@@ -45,21 +47,24 @@
result.append(author_data)
- result = {"@list": result}
- return result
+ return {"@list": result}
- def normalize_doi(self, s):
+ def normalize_doi(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": "https://doi.org/" + s}
+ return None
- def normalize_license(self, s):
+ def normalize_license(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
+ return None
- def normalize_repository_code(self, s):
+ def normalize_repository_code(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": s}
+ return None
- def normalize_date_released(self, s):
+ def normalize_date_released(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@value": s, "@type": SCHEMA_URI + "Date"}
+ return None
diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py
--- a/swh/indexer/metadata_dictionary/codemeta.py
+++ b/swh/indexer/metadata_dictionary/codemeta.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import json
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CODEMETA_TERMS, expand
@@ -20,10 +21,10 @@
string_fields = None
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
- def translate(self, content):
+ def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import os
+from typing import Any, Dict, List, Optional
import xml.parsers.expat
import xmltodict
@@ -46,7 +47,7 @@
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
- def parse_repositories(self, d):
+ def parse_repositories(self, d: Dict) -> Optional[List[Optional[Dict[str, Any]]]]:
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
@@ -75,11 +76,11 @@
results = []
return [res for res in results if res] or None
- def parse_repository(self, d, repo):
+ def parse_repository(self, d: Dict, repo) -> Optional[Dict[str, Any]]:
if not isinstance(repo, dict):
- return
+ return None
if repo.get("layout", "default") != "default":
- return # TODO ?
+ return None # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
@@ -90,8 +91,9 @@
):
repo = os.path.join(url, *group_id.split("."), artifact_id)
return {"@id": repo}
+ return None
- def normalize_groupId(self, id_):
+ def normalize_groupId(self, id_) -> Optional[Dict[str, Any]]:
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
@@ -99,8 +101,9 @@
"""
if isinstance(id_, str):
return {"@id": id_}
+ return None
- def parse_licenses(self, d):
+ def parse_licenses(self, d) -> Optional[List[Dict[str, Any]]]:
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
@@ -148,12 +151,12 @@
licenses = d.get("licenses")
if not isinstance(licenses, dict):
- return
+ return None
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
- return
+ return None
return [
{"@id": license["url"]}
for license in licenses
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import re
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -29,7 +30,7 @@
# 'bitbucket': 'https://bitbucket.org/',
}
- def normalize_repository(self, d):
+ def normalize_repository(self, d) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#repository
>>> NpmMapping().normalize_repository({
@@ -67,7 +68,7 @@
return {"@id": url}
- def normalize_bugs(self, d):
+ def normalize_bugs(self, d) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
@@ -90,7 +91,7 @@
r"^ *" r"(?P<name>.*?)" r"( +<(?P<email>.*)>)?" r"( +\((?P<url>.*)\))?" r" *$"
)
- def normalize_author(self, d):
+ def normalize_author(self, d) -> Optional[Dict[str, Any]]:
"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
@@ -111,7 +112,7 @@
'http://schema.org/name': 'John Doe',
'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]}
""" # noqa
- author = {"@type": SCHEMA_URI + "Person"}
+ author: Dict[str, Any] = {"@type": SCHEMA_URI + "Person"}
if isinstance(d, dict):
name = d.get("name", None)
email = d.get("email", None)
@@ -133,7 +134,7 @@
author[SCHEMA_URI + "url"] = {"@id": url}
return {"@list": [author]}
- def normalize_license(self, s):
+ def normalize_license(self, s) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
@@ -141,8 +142,9 @@
"""
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
+ return None
- def normalize_homepage(self, s):
+ def normalize_homepage(self, s) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe')
@@ -150,8 +152,9 @@
"""
if isinstance(s, str):
return {"@id": s}
+ return None
- def normalize_keywords(self, lst):
+ def normalize_keywords(self, lst: List) -> Optional[List[str]]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
@@ -159,3 +162,4 @@
"""
if isinstance(lst, list):
return [x for x in lst if isinstance(x, str)]
+ return None
diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py
--- a/swh/indexer/metadata_dictionary/python.py
+++ b/swh/indexer/metadata_dictionary/python.py
@@ -6,6 +6,7 @@
import email.parser
import email.policy
import itertools
+from typing import Dict, List
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -44,9 +45,9 @@
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy())
- def translate(self, content):
+ def translate(self, content: bytes) -> Dict:
msg = self._parser.parsebytes(content)
- d = {}
+ d: Dict[str, List[str]] = {}
for (key, value) in msg.items():
key = _normalize_pkginfo_key(key)
if value != "UNKNOWN":
@@ -66,11 +67,11 @@
}
return self.normalize_translation(metadata)
- def normalize_home_page(self, urls):
+ def normalize_home_page(self, urls: List[str]) -> List[Dict[str, str]]:
return [{"@id": url} for url in urls]
- def normalize_keywords(self, keywords):
+ def normalize_keywords(self, keywords: List[str]) -> List[str]:
return list(itertools.chain.from_iterable(s.split(" ") for s in keywords))
- def normalize_license(self, licenses):
+ def normalize_license(self, licenses: str) -> List[Dict[str, str]]:
return [{"@id": license} for license in licenses]
diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py
--- a/swh/indexer/metadata_dictionary/ruby.py
+++ b/swh/indexer/metadata_dictionary/ruby.py
@@ -6,13 +6,14 @@
import ast
import itertools
import re
+from typing import Any, Dict, List, Optional, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
-from .base import DictMapping
+from .base import DictMapping, FileEntry
-def name_to_person(name):
+def name_to_person(name: str) -> Dict[str, str]:
return {
"@type": SCHEMA_URI + "Person",
SCHEMA_URI + "name": name,
@@ -28,29 +29,29 @@
_re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]:
for entry in file_entries:
if entry["name"].endswith(b".gemspec"):
return [entry["sha1"]]
return []
- def translate(self, raw_content):
+ def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
try:
- raw_content = raw_content.decode()
+ content: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
# Skip lines before 'Gem::Specification.new'
lines = itertools.dropwhile(
- lambda x: not self._re_spec_new.match(x), raw_content.split("\n")
+ lambda x: not self._re_spec_new.match(x), content.split("\n")
)
try:
next(lines) # Consume 'Gem::Specification.new'
except StopIteration:
self.log.warning("Could not find Gem::Specification in %s", self.log_suffix)
- return
+ return None
content_dict = {}
for line in lines:
@@ -61,7 +62,7 @@
content_dict[match.group("key")] = value
return self._translate_dict(content_dict)
- def eval_ruby_expression(self, expr):
+ def eval_ruby_expression(self, expr: str) -> Optional[Union[str, List]]:
"""Very simple evaluator of Ruby expressions.
>>> GemspecMapping().eval_ruby_expression('"Foo bar"')
@@ -97,31 +98,36 @@
# of such strings).
tree = ast.parse(expr, mode="eval")
except (SyntaxError, ValueError):
- return
+ return None
if isinstance(tree, ast.Expression):
return evaluator(tree.body)
+ return None
- def normalize_homepage(self, s):
+ def normalize_homepage(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": s}
+ return None
- def normalize_license(self, s):
+ def normalize_license(self, s) -> Optional[List[Dict[str, str]]]:
if isinstance(s, str):
return [{"@id": "https://spdx.org/licenses/" + s}]
+ return None
- def normalize_licenses(self, licenses):
+ def normalize_licenses(self, licenses) -> Optional[List[Dict[str, str]]]:
if isinstance(licenses, list):
return [
{"@id": "https://spdx.org/licenses/" + license}
for license in licenses
if isinstance(license, str)
]
+ return None
- def normalize_author(self, author):
+ def normalize_author(self, author) -> Optional[Dict[str, Any]]:
if isinstance(author, str):
return {"@list": [name_to_person(author)]}
+ return None
- def normalize_authors(self, authors):
+ def normalize_authors(self, authors) -> Optional[Dict[str, List[Dict[str, Any]]]]:
if isinstance(authors, list):
return {
"@list": [
@@ -130,3 +136,4 @@
if isinstance(author, str)
]
}
+ return None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 5:07 AM (8 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223291
Attached To
D7342: Type annotations in metadata mappings
Event Timeline
Log In to Comment