Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066279
D5295.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D5295.diff
View Options
diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py
--- a/swh/indexer/metadata_dictionary/__init__.py
+++ b/swh/indexer/metadata_dictionary/__init__.py
@@ -1,10 +1,10 @@
import collections
-from types import ModuleType
-from typing import DefaultDict, Set
+from typing import DefaultDict, Set, Type
import click
from . import codemeta, maven, npm, python, ruby
+from .base import BaseMapping
MAPPINGS = {
"CodemetaMapping": codemeta.CodemetaMapping,
@@ -15,13 +15,13 @@
}
-def list_terms() -> DefaultDict[str, Set[ModuleType]]:
+def list_terms() -> DefaultDict[str, Set[Type[BaseMapping]]]:
"""Returns a dictionary with all supported CodeMeta terms as keys,
and the mappings that support each of them as values."""
- d: DefaultDict[str, Set[ModuleType]] = collections.defaultdict(set)
+ d: DefaultDict[str, Set[Type[BaseMapping]]] = collections.defaultdict(set)
for mapping in MAPPINGS.values():
- for term in mapping.supported_terms(): # type: ignore
- d[term].add(mapping) # type: ignore
+ for term in mapping.supported_terms(): # type:ignore [attr-defined]
+ d[term].add(mapping)
return d
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -5,11 +5,26 @@
import json
import logging
-from typing import Any, Dict, List, Set
+from typing import Any, Dict, List, Optional, Set
+
+from typing_extensions import TypedDict
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
+class File_entries(TypedDict):
+ name: bytes
+ type: str
+ dir_id: bytes
+ sha1_git: Optional[bytes]
+ target: Optional[bytes]
+ length: Optional[int]
+ status: Optional[str]
+ perms: Optional[int]
+ sha1: bytes
+ sha256: Optional[bytes]
+
+
class BaseMapping:
"""Base class for mappings to inherit from
@@ -32,7 +47,7 @@
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
- def detect_metadata_files(cls, files: List) -> List[str]:
+ def detect_metadata_files(cls, files: List[File_entries]) -> List[bytes]:
"""
Detects files potentially containing metadata
@@ -44,7 +59,7 @@
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
- def translate(self, file_content: bytes) -> Any:
+ def translate(self, file_content: bytes) -> Optional[Dict[str, Any]]:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
@@ -60,10 +75,11 @@
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
- def detect_metadata_files(cls, file_entries: List[Dict[str, Any]]) -> List[str]:
+ def detect_metadata_files(cls, file_entries: List[File_entries]) -> List[bytes]:
for entry in file_entries:
- if entry["name"].lower() == cls.filename.lower(): # type: ignore
- return [entry["sha1"]]
+ if isinstance(entry["name"], bytes) and isinstance(cls.filename, bytes):
+ if entry["name"].lower() == cls.filename.lower():
+ return [entry["sha1"]]
return []
@@ -81,20 +97,24 @@
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
- def _normalize_method_name(name: str):
+ def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls) -> Set[str]:
- return {
- term
- for (key, term) in cls.mapping.items() # type: ignore
- if key in cls.string_fields
- or hasattr(cls, "translate_" + cls._normalize_method_name(key))
- or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
- }
-
- def _translate_dict(self, content_dict: Dict, *, normalize=True) -> Dict:
+ if isinstance(cls.mapping, Dict):
+ return {
+ term
+ for (key, term) in cls.mapping.items()
+ if key in cls.string_fields
+ or hasattr(cls, "translate_" + cls._normalize_method_name(key))
+ or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
+ }
+ return set()
+
+ def _translate_dict(
+ self, content_dict: Dict[str, Any], *, normalize=True
+ ) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
@@ -150,7 +170,7 @@
class JsonMapping(DictMapping, SingleFileMapping):
"""Base class for all mappings that use a JSON file as input."""
- def translate(self, raw_content: bytes) -> Any:
+ def translate(self, raw_content_bytes: bytes) -> Optional[Dict[str, Any]]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
@@ -164,14 +184,16 @@
"""
try:
- raw_content = raw_content.decode() # type: ignore
+ raw_content = raw_content_bytes.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
try:
content_dict = json.loads(raw_content)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
- return
+ return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
+ else:
+ return None
diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py
--- a/swh/indexer/metadata_dictionary/codemeta.py
+++ b/swh/indexer/metadata_dictionary/codemeta.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
import json
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Union
from swh.indexer.codemeta import CODEMETA_TERMS, expand
@@ -24,7 +24,11 @@
def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
- def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
+ def translate(
+ self, content: bytes
+ ) -> Optional[
+ Dict[str, Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]]]
+ ]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
import os
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple, Union
import xml.parsers.expat
import xmltodict
@@ -24,7 +24,9 @@
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
- def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
+ def translate(
+ self, content: bytes
+ ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]:
try:
d = xmltodict.parse(content).get("project") or {}
except xml.parsers.expat.ExpatError:
@@ -44,7 +46,7 @@
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
- def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Any]]:
+ def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Dict[str, str]]]:
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
@@ -73,9 +75,11 @@
results = []
return [res for res in results if res] or None
- def parse_repository(self, d: Dict[str, Any], repo: Dict[str, Any]) -> Any:
+ def parse_repository(
+ self, d: Dict[str, Any], repo: Dict[str, Any]
+ ) -> Optional[Dict[str, str]]:
if not isinstance(repo, dict):
- return
+ return None
if repo.get("layout", "default") != "default":
return None # TODO ?
url = repo.get("url")
@@ -87,6 +91,8 @@
and isinstance(artifact_id, str)
):
return {"@id": os.path.join(url, *group_id.split("."), artifact_id)}
+ else:
+ return None
def normalize_groupId(self, id_: str) -> Dict[str, str]:
"""https://maven.apache.org/pom.html#Maven_Coordinates
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -31,7 +31,7 @@
}
def normalize_repository(
- self, d: Union[Dict, str, Any]
+ self, d: Union[Dict[str, Any], str]
) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#repository
@@ -70,7 +70,7 @@
return {"@id": url}
- def normalize_bugs(self, d: Union[Dict, str, Any]) -> Optional[Dict[str, str]]:
+ def normalize_bugs(self, d: Union[Dict, str]) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
@@ -94,8 +94,8 @@
)
def normalize_author(
- self, d: Union[Dict, str, Any]
- ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
+ self, d: Union[Dict, str]
+ ) -> Optional[Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]:
"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
@@ -116,7 +116,7 @@
'http://schema.org/name': 'John Doe',
'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]}
""" # noqa
- author = {"@type": SCHEMA_URI + "Person"}
+ author: Dict[str, Union[str, Dict[str, str]]] = {"@type": SCHEMA_URI + "Person"}
if isinstance(d, dict):
name = d.get("name", None)
email = d.get("email", None)
@@ -135,10 +135,10 @@
if email and isinstance(email, str):
author[SCHEMA_URI + "email"] = email
if url and isinstance(url, str):
- author[SCHEMA_URI + "url"] = {"@id": url} # type: ignore
+ author[SCHEMA_URI + "url"] = {"@id": url}
return {"@list": [author]}
- def normalize_license(self, s: str) -> Any:
+ def normalize_license(self, s: str) -> Dict[str, str]:
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
@@ -147,7 +147,7 @@
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
- def normalize_homepage(self, s: str) -> Any:
+ def normalize_homepage(self, s: str) -> Dict[str, str]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe')
@@ -156,7 +156,7 @@
if isinstance(s, str):
return {"@id": s}
- def normalize_keywords(self, lst: List[str]) -> Any:
+ def normalize_keywords(self, lst: List[str]) -> Optional[List[str]]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
@@ -164,3 +164,5 @@
"""
if isinstance(lst, list):
return [x for x in lst if isinstance(x, str)]
+ else:
+ return None
diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py
--- a/swh/indexer/metadata_dictionary/python.py
+++ b/swh/indexer/metadata_dictionary/python.py
@@ -6,7 +6,7 @@
import email.parser
import email.policy
import itertools
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional, Tuple, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -45,7 +45,9 @@
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy())
- def translate(self, content: bytes) -> Dict[str, Any]:
+ def translate(
+ self, content: bytes
+ ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]:
msg = self._parser.parsebytes(content)
d: Dict[str, List[str]] = {}
for (key, value) in msg.items():
diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py
--- a/swh/indexer/metadata_dictionary/ruby.py
+++ b/swh/indexer/metadata_dictionary/ruby.py
@@ -6,11 +6,11 @@
import ast
import itertools
import re
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
-from .base import DictMapping
+from .base import DictMapping, File_entries
def name_to_person(name: str) -> Dict[str, str]:
@@ -29,15 +29,20 @@
_re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)")
@classmethod
- def detect_metadata_files(cls: Any, file_entries: Any) -> List[str]:
+ def detect_metadata_files(
+ cls: Any, file_entries: List[File_entries]
+ ) -> List[bytes]:
for entry in file_entries:
- if entry["name"].endswith(b".gemspec"):
- return [entry["sha1"]]
+ if isinstance(entry["name"], bytes):
+ if entry["name"].endswith(b".gemspec"):
+ return [entry["sha1"]]
return []
- def translate(self, raw_content: Any) -> Optional[Dict[str, str]]:
+ def translate(
+ self, raw_content_bytes: bytes
+ ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]:
try:
- raw_content = raw_content.decode()
+ raw_content = raw_content_bytes.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
@@ -53,7 +58,7 @@
self.log.warning("Could not find Gem::Specification in %s", self.log_suffix)
return None
- content_dict = {}
+ content_dict: Dict[str, Union[str, List[str]]] = {}
for line in lines:
match = self._re_spec_entry.match(line)
if match:
@@ -62,7 +67,7 @@
content_dict[match.group("key")] = value
return self._translate_dict(content_dict)
- def eval_ruby_expression(self, expr: str) -> Any:
+ def eval_ruby_expression(self, expr: str) -> Optional[Union[str, List[str]]]:
"""Very simple evaluator of Ruby expressions.
>>> GemspecMapping().eval_ruby_expression('"Foo bar"')
@@ -101,6 +106,8 @@
return None
if isinstance(tree, ast.Expression):
return evaluator(tree.body)
+ else:
+ return None
def normalize_homepage(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
@@ -110,19 +117,22 @@
if isinstance(s, str):
return [{"@id": "https://spdx.org/licenses/" + s}]
- def normalize_licenses(self, licenses: List[str]) -> Any:
+ def normalize_licenses(self, licenses: List[str]) -> List[Dict[str, str]]:
if isinstance(licenses, list):
return [
{"@id": "https://spdx.org/licenses/" + license}
for license in licenses
if isinstance(license, str)
]
+ return []
- def normalize_author(self, author: str) -> Any:
+ def normalize_author(self, author: str) -> Dict[str, List[Dict[str, str]]]:
if isinstance(author, str):
return {"@list": [name_to_person(author)]}
- def normalize_authors(self, authors: List[str]) -> Any:
+ def normalize_authors(
+ self, authors: List[str]
+ ) -> Optional[Dict[str, List[Dict[str, str]]]]:
if isinstance(authors, list):
return {
"@list": [
@@ -131,3 +141,5 @@
if isinstance(author, str)
]
}
+ else:
+ return None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 5:05 AM (8 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225976
Attached To
D5295: Add type annotations to metadata mappings
Event Timeline
Log In to Comment