Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123050
D5295.id18958.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D5295.id18958.diff
View Options
diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py
--- a/swh/indexer/metadata_dictionary/__init__.py
+++ b/swh/indexer/metadata_dictionary/__init__.py
@@ -1,4 +1,6 @@
import collections
+from types import ModuleType
+from typing import DefaultDict, Set
import click
@@ -13,20 +15,20 @@
}
-def list_terms():
+def list_terms() -> DefaultDict[str, Set[ModuleType]]:
"""Returns a dictionary with all supported CodeMeta terms as keys,
and the mappings that support each of them as values."""
- d = collections.defaultdict(set)
+ d: DefaultDict[str, Set[ModuleType]] = collections.defaultdict(set)
for mapping in MAPPINGS.values():
- for term in mapping.supported_terms():
- d[term].add(mapping)
+ for term in mapping.supported_terms(): # type: ignore
+ d[term].add(mapping) # type: ignore
return d
@click.command()
@click.argument("mapping_name")
@click.argument("file_name")
-def main(mapping_name, file_name):
+def main(mapping_name: str, file_name: str) -> None:
from pprint import pprint
with open(file_name, "rb") as fd:
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -5,7 +5,7 @@
import json
import logging
-from typing import List
+from typing import Any, Dict, List, Set
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
@@ -19,7 +19,7 @@
- override translate function
"""
- def __init__(self, log_suffix=""):
+ def __init__(self, log_suffix: str = ""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
@@ -32,7 +32,7 @@
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
- def detect_metadata_files(cls, files):
+ def detect_metadata_files(cls, files: List) -> List[str]:
"""
Detects files potentially containing metadata
@@ -44,10 +44,10 @@
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
- def translate(self, file_content):
+ def translate(self, file_content: bytes) -> Any:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
- def normalize_translation(self, metadata):
+ def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
return compact(metadata)
@@ -60,9 +60,9 @@
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls, file_entries: List[Dict[str, Any]]) -> List[str]:
for entry in file_entries:
- if entry["name"].lower() == cls.filename.lower():
+ if entry["name"].lower() == cls.filename.lower(): # type: ignore
return [entry["sha1"]]
return []
@@ -71,7 +71,7 @@
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
- string_fields = [] # type: List[str]
+ string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
@@ -81,20 +81,20 @@
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
- def _normalize_method_name(name):
+ def _normalize_method_name(name: str):
return name.replace("-", "_")
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> Set[str]:
return {
term
- for (key, term) in cls.mapping.items()
+ for (key, term) in cls.mapping.items() # type: ignore
if key in cls.string_fields
or hasattr(cls, "translate_" + cls._normalize_method_name(key))
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
- def _translate_dict(self, content_dict, *, normalize=True):
+ def _translate_dict(self, content_dict: Dict, *, normalize=True) -> Dict:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
@@ -150,7 +150,7 @@
class JsonMapping(DictMapping, SingleFileMapping):
"""Base class for all mappings that use a JSON file as input."""
- def translate(self, raw_content):
+ def translate(self, raw_content: bytes) -> Any:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
@@ -164,7 +164,7 @@
"""
try:
- raw_content = raw_content.decode()
+ raw_content = raw_content.decode() # type: ignore
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return
diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py
--- a/swh/indexer/metadata_dictionary/codemeta.py
+++ b/swh/indexer/metadata_dictionary/codemeta.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import json
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CODEMETA_TERMS, expand
@@ -20,10 +21,10 @@
string_fields = None
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
- def translate(self, content):
+ def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import os
+from typing import Any, Dict, List, Optional
import xml.parsers.expat
import xmltodict
@@ -23,7 +24,7 @@
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
- def translate(self, content):
+ def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
d = xmltodict.parse(content).get("project") or {}
except xml.parsers.expat.ExpatError:
@@ -43,7 +44,7 @@
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
- def parse_repositories(self, d):
+ def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Any]]:
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
@@ -72,11 +73,11 @@
results = []
return [res for res in results if res] or None
- def parse_repository(self, d, repo):
+ def parse_repository(self, d: Dict[str, Any], repo: Dict[str, Any]) -> Any:
if not isinstance(repo, dict):
return
if repo.get("layout", "default") != "default":
- return # TODO ?
+ return None # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
@@ -85,10 +86,9 @@
and isinstance(group_id, str)
and isinstance(artifact_id, str)
):
- repo = os.path.join(url, *group_id.split("."), artifact_id)
- return {"@id": repo}
+ return {"@id": os.path.join(url, *group_id.split("."), artifact_id)}
- def normalize_groupId(self, id_):
+ def normalize_groupId(self, id_: str) -> Dict[str, str]:
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
@@ -97,7 +97,7 @@
if isinstance(id_, str):
return {"@id": id_}
- def parse_licenses(self, d):
+ def parse_licenses(self, d: Dict[str, Any]) -> Optional[List[Dict[str, str]]]:
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
@@ -145,12 +145,12 @@
licenses = d.get("licenses")
if not isinstance(licenses, dict):
- return
+ return None
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
- return
+ return None
return [
{"@id": license["url"]}
for license in licenses
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import re
+from typing import Any, Dict, List, Optional, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -29,7 +30,9 @@
# 'bitbucket': 'https://bitbucket.org/',
}
- def normalize_repository(self, d):
+ def normalize_repository(
+ self, d: Union[Dict, str, Any]
+ ) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#repository
>>> NpmMapping().normalize_repository({
@@ -67,7 +70,7 @@
return {"@id": url}
- def normalize_bugs(self, d):
+ def normalize_bugs(self, d: Union[Dict, str, Any]) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
@@ -90,7 +93,9 @@
r"^ *" r"(?P<name>.*?)" r"( +<(?P<email>.*)>)?" r"( +\((?P<url>.*)\))?" r" *$"
)
- def normalize_author(self, d):
+ def normalize_author(
+ self, d: Union[Dict, str, Any]
+ ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
@@ -130,10 +135,10 @@
if email and isinstance(email, str):
author[SCHEMA_URI + "email"] = email
if url and isinstance(url, str):
- author[SCHEMA_URI + "url"] = {"@id": url}
+ author[SCHEMA_URI + "url"] = {"@id": url} # type: ignore
return {"@list": [author]}
- def normalize_license(self, s):
+ def normalize_license(self, s: str) -> Any:
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
@@ -142,7 +147,7 @@
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
- def normalize_homepage(self, s):
+ def normalize_homepage(self, s: str) -> Any:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe')
@@ -151,7 +156,7 @@
if isinstance(s, str):
return {"@id": s}
- def normalize_keywords(self, lst):
+ def normalize_keywords(self, lst: List[str]) -> Any:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py
--- a/swh/indexer/metadata_dictionary/python.py
+++ b/swh/indexer/metadata_dictionary/python.py
@@ -6,6 +6,7 @@
import email.parser
import email.policy
import itertools
+from typing import Any, Dict, List
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -15,11 +16,11 @@
class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy):
- def header_fetch_parse(self, name, value):
+ def header_fetch_parse(self, name: str, value: str) -> str:
if hasattr(value, "name"):
return value
value = value.replace("\n ", "\n")
- return self.header_factory(name, value)
+ return self.header_factory(name, value) # type: ignore
class PythonPkginfoMapping(DictMapping, SingleFileMapping):
@@ -44,9 +45,9 @@
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy())
- def translate(self, content):
+ def translate(self, content: bytes) -> Dict[str, Any]:
msg = self._parser.parsebytes(content)
- d = {}
+ d: Dict[str, List[str]] = {}
for (key, value) in msg.items():
key = _normalize_pkginfo_key(key)
if value != "UNKNOWN":
@@ -66,11 +67,11 @@
}
return self.normalize_translation(metadata)
- def normalize_home_page(self, urls):
+ def normalize_home_page(self, urls: List[str]) -> List[Dict[str, str]]:
return [{"@id": url} for url in urls]
- def normalize_keywords(self, keywords):
+ def normalize_keywords(self, keywords: List[str]) -> List[str]:
return list(itertools.chain.from_iterable(s.split(" ") for s in keywords))
- def normalize_license(self, licenses):
+ def normalize_license(self, licenses: List[str]) -> List[Dict[str, str]]:
return [{"@id": license} for license in licenses]
diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py
--- a/swh/indexer/metadata_dictionary/ruby.py
+++ b/swh/indexer/metadata_dictionary/ruby.py
@@ -6,13 +6,14 @@
import ast
import itertools
import re
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
from .base import DictMapping
-def name_to_person(name):
+def name_to_person(name: str) -> Dict[str, str]:
return {
"@type": SCHEMA_URI + "Person",
SCHEMA_URI + "name": name,
@@ -28,18 +29,18 @@
_re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls: Any, file_entries: Any) -> List[str]:
for entry in file_entries:
if entry["name"].endswith(b".gemspec"):
return [entry["sha1"]]
return []
- def translate(self, raw_content):
+ def translate(self, raw_content: Any) -> Optional[Dict[str, str]]:
try:
raw_content = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
# Skip lines before 'Gem::Specification.new'
lines = itertools.dropwhile(
@@ -50,7 +51,7 @@
next(lines) # Consume 'Gem::Specification.new'
except StopIteration:
self.log.warning("Could not find Gem::Specification in %s", self.log_suffix)
- return
+ return None
content_dict = {}
for line in lines:
@@ -61,7 +62,7 @@
content_dict[match.group("key")] = value
return self._translate_dict(content_dict)
- def eval_ruby_expression(self, expr):
+ def eval_ruby_expression(self, expr: str) -> Any:
"""Very simple evaluator of Ruby expressions.
>>> GemspecMapping().eval_ruby_expression('"Foo bar"')
@@ -97,19 +98,19 @@
# of such strings).
tree = ast.parse(expr, mode="eval")
except (SyntaxError, ValueError):
- return
+ return None
if isinstance(tree, ast.Expression):
return evaluator(tree.body)
- def normalize_homepage(self, s):
+ def normalize_homepage(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
return {"@id": s}
- def normalize_license(self, s):
+ def normalize_license(self, s: str) -> List[Dict[str, str]]:
if isinstance(s, str):
return [{"@id": "https://spdx.org/licenses/" + s}]
- def normalize_licenses(self, licenses):
+ def normalize_licenses(self, licenses: List[str]) -> Any:
if isinstance(licenses, list):
return [
{"@id": "https://spdx.org/licenses/" + license}
@@ -117,11 +118,11 @@
if isinstance(license, str)
]
- def normalize_author(self, author):
+ def normalize_author(self, author: str) -> Any:
if isinstance(author, str):
return {"@list": [name_to_person(author)]}
- def normalize_authors(self, authors):
+ def normalize_authors(self, authors: List[str]) -> Any:
if isinstance(authors, list):
return {
"@list": [
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 5:42 PM (2 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3233716
Attached To
D5295: Add type annotations to metadata mappings
Event Timeline
Log In to Comment