Page MenuHomeSoftware Heritage

D5295.id19079.diff
No OneTemporary

D5295.id19079.diff

diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py
--- a/swh/indexer/metadata_dictionary/__init__.py
+++ b/swh/indexer/metadata_dictionary/__init__.py
@@ -1,10 +1,10 @@
import collections
-from types import ModuleType
-from typing import DefaultDict, Set
+from typing import DefaultDict, Set, Type
import click
from . import codemeta, maven, npm, python, ruby
+from .base import BaseMapping
MAPPINGS = {
"CodemetaMapping": codemeta.CodemetaMapping,
@@ -15,13 +15,13 @@
}
-def list_terms() -> DefaultDict[str, Set[ModuleType]]:
+def list_terms() -> DefaultDict[str, Set[Type[BaseMapping]]]:
"""Returns a dictionary with all supported CodeMeta terms as keys,
and the mappings that support each of them as values."""
- d: DefaultDict[str, Set[ModuleType]] = collections.defaultdict(set)
+ d: DefaultDict[str, Set[Type[BaseMapping]]] = collections.defaultdict(set)
for mapping in MAPPINGS.values():
- for term in mapping.supported_terms(): # type: ignore
- d[term].add(mapping) # type: ignore
+ for term in mapping.supported_terms(): # type:ignore [attr-defined]
+ d[term].add(mapping)
return d
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -5,11 +5,26 @@
import json
import logging
-from typing import Any, Dict, List, Set
+from typing import Any, Dict, List, Optional, Set
+
+from typing_extensions import TypedDict
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
+class File_entries(TypedDict):
+ name: bytes
+ type: str
+ dir_id: bytes
+ sha1_git: Optional[bytes]
+ target: Optional[bytes]
+ length: Optional[int]
+ status: Optional[str]
+ perms: Optional[int]
+ sha1: bytes
+ sha256: Optional[bytes]
+
+
class BaseMapping:
"""Base class for mappings to inherit from
@@ -32,7 +47,7 @@
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
- def detect_metadata_files(cls, files: List) -> List[str]:
+ def detect_metadata_files(cls, files: List[File_entries]) -> List[bytes]:
"""
Detects files potentially containing metadata
@@ -44,7 +59,7 @@
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
- def translate(self, file_content: bytes) -> Any:
+ def translate(self, file_content: bytes) -> Optional[Dict[str, Any]]:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
@@ -60,10 +75,11 @@
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
- def detect_metadata_files(cls, file_entries: List[Dict[str, Any]]) -> List[str]:
+ def detect_metadata_files(cls, file_entries: List[File_entries]) -> List[bytes]:
for entry in file_entries:
- if entry["name"].lower() == cls.filename.lower(): # type: ignore
- return [entry["sha1"]]
+ if isinstance(entry["name"], bytes) and isinstance(cls.filename, bytes):
+ if entry["name"].lower() == cls.filename.lower():
+ return [entry["sha1"]]
return []
@@ -81,20 +97,24 @@
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
- def _normalize_method_name(name: str):
+ def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
def supported_terms(cls) -> Set[str]:
- return {
- term
- for (key, term) in cls.mapping.items() # type: ignore
- if key in cls.string_fields
- or hasattr(cls, "translate_" + cls._normalize_method_name(key))
- or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
- }
-
- def _translate_dict(self, content_dict: Dict, *, normalize=True) -> Dict:
+ if isinstance(cls.mapping, Dict):
+ return {
+ term
+ for (key, term) in cls.mapping.items()
+ if key in cls.string_fields
+ or hasattr(cls, "translate_" + cls._normalize_method_name(key))
+ or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
+ }
+ return set()
+
+ def _translate_dict(
+ self, content_dict: Dict[str, Any], *, normalize=True
+ ) -> Dict[str, Any]:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
@@ -150,7 +170,7 @@
class JsonMapping(DictMapping, SingleFileMapping):
"""Base class for all mappings that use a JSON file as input."""
- def translate(self, raw_content: bytes) -> Any:
+ def translate(self, raw_content_bytes: bytes) -> Optional[Dict[str, Any]]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
@@ -164,14 +184,16 @@
"""
try:
- raw_content = raw_content.decode() # type: ignore
+ raw_content = raw_content_bytes.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
try:
content_dict = json.loads(raw_content)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
- return
+ return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
+ else:
+ return None
diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py
--- a/swh/indexer/metadata_dictionary/codemeta.py
+++ b/swh/indexer/metadata_dictionary/codemeta.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
import json
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Union
from swh.indexer.codemeta import CODEMETA_TERMS, expand
@@ -24,7 +24,11 @@
def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
- def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
+ def translate(
+ self, content: bytes
+ ) -> Optional[
+ Dict[str, Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]]]
+ ]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -4,7 +4,7 @@
# See top-level LICENSE file for more information
import os
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple, Union
import xml.parsers.expat
import xmltodict
@@ -24,7 +24,9 @@
mapping = CROSSWALK_TABLE["Java (Maven)"]
string_fields = ["name", "version", "description", "email"]
- def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
+ def translate(
+ self, content: bytes
+ ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]:
try:
d = xmltodict.parse(content).get("project") or {}
except xml.parsers.expat.ExpatError:
@@ -44,7 +46,7 @@
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
- def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Any]]:
+ def parse_repositories(self, d: Dict[str, Any]) -> Optional[List[Dict[str, str]]]:
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
@@ -73,9 +75,11 @@
results = []
return [res for res in results if res] or None
- def parse_repository(self, d: Dict[str, Any], repo: Dict[str, Any]) -> Any:
+ def parse_repository(
+ self, d: Dict[str, Any], repo: Dict[str, Any]
+ ) -> Optional[Dict[str, str]]:
if not isinstance(repo, dict):
- return
+ return None
if repo.get("layout", "default") != "default":
return None # TODO ?
url = repo.get("url")
@@ -87,6 +91,8 @@
and isinstance(artifact_id, str)
):
return {"@id": os.path.join(url, *group_id.split("."), artifact_id)}
+ else:
+ return None
def normalize_groupId(self, id_: str) -> Dict[str, str]:
"""https://maven.apache.org/pom.html#Maven_Coordinates
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -31,7 +31,7 @@
}
def normalize_repository(
- self, d: Union[Dict, str, Any]
+ self, d: Union[Dict[str, Any], str]
) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#repository
@@ -70,7 +70,7 @@
return {"@id": url}
- def normalize_bugs(self, d: Union[Dict, str, Any]) -> Optional[Dict[str, str]]:
+ def normalize_bugs(self, d: Union[Dict, str]) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
@@ -94,8 +94,8 @@
)
def normalize_author(
- self, d: Union[Dict, str, Any]
- ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
+ self, d: Union[Dict, str]
+ ) -> Optional[Dict[str, List[Dict[str, Union[str, Dict[str, str]]]]]]:
"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
@@ -116,7 +116,7 @@
'http://schema.org/name': 'John Doe',
'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]}
""" # noqa
- author = {"@type": SCHEMA_URI + "Person"}
+ author: Dict[str, Union[str, Dict[str, str]]] = {"@type": SCHEMA_URI + "Person"}
if isinstance(d, dict):
name = d.get("name", None)
email = d.get("email", None)
@@ -135,10 +135,10 @@
if email and isinstance(email, str):
author[SCHEMA_URI + "email"] = email
if url and isinstance(url, str):
- author[SCHEMA_URI + "url"] = {"@id": url} # type: ignore
+ author[SCHEMA_URI + "url"] = {"@id": url}
return {"@list": [author]}
- def normalize_license(self, s: str) -> Any:
+ def normalize_license(self, s: str) -> Dict[str, str]:
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
@@ -147,7 +147,7 @@
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
- def normalize_homepage(self, s: str) -> Any:
+ def normalize_homepage(self, s: str) -> Dict[str, str]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe')
@@ -156,7 +156,7 @@
if isinstance(s, str):
return {"@id": s}
- def normalize_keywords(self, lst: List[str]) -> Any:
+ def normalize_keywords(self, lst: List[str]) -> Optional[List[str]]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
@@ -164,3 +164,5 @@
"""
if isinstance(lst, list):
return [x for x in lst if isinstance(x, str)]
+ else:
+ return None
diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py
--- a/swh/indexer/metadata_dictionary/python.py
+++ b/swh/indexer/metadata_dictionary/python.py
@@ -6,7 +6,7 @@
import email.parser
import email.policy
import itertools
-from typing import Any, Dict, List
+from typing import Any, Dict, List, Optional, Tuple, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -45,7 +45,9 @@
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy())
- def translate(self, content: bytes) -> Dict[str, Any]:
+ def translate(
+ self, content: bytes
+ ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]:
msg = self._parser.parsebytes(content)
d: Dict[str, List[str]] = {}
for (key, value) in msg.items():
diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py
--- a/swh/indexer/metadata_dictionary/ruby.py
+++ b/swh/indexer/metadata_dictionary/ruby.py
@@ -6,11 +6,11 @@
import ast
import itertools
import re
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
-from .base import DictMapping
+from .base import DictMapping, File_entries
def name_to_person(name: str) -> Dict[str, str]:
@@ -29,15 +29,20 @@
_re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)")
@classmethod
- def detect_metadata_files(cls: Any, file_entries: Any) -> List[str]:
+ def detect_metadata_files(
+ cls: Any, file_entries: List[File_entries]
+ ) -> List[bytes]:
for entry in file_entries:
- if entry["name"].endswith(b".gemspec"):
- return [entry["sha1"]]
+ if isinstance(entry["name"], bytes):
+ if entry["name"].endswith(b".gemspec"):
+ return [entry["sha1"]]
return []
- def translate(self, raw_content: Any) -> Optional[Dict[str, str]]:
+ def translate(
+ self, raw_content_bytes: bytes
+ ) -> Optional[Dict[str, Union[str, List[Any], Dict[str, Any], Tuple[str]]]]:
try:
- raw_content = raw_content.decode()
+ raw_content = raw_content_bytes.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
return None
@@ -53,7 +58,7 @@
self.log.warning("Could not find Gem::Specification in %s", self.log_suffix)
return None
- content_dict = {}
+ content_dict: Dict[str, Union[str, List[str]]] = {}
for line in lines:
match = self._re_spec_entry.match(line)
if match:
@@ -62,7 +67,7 @@
content_dict[match.group("key")] = value
return self._translate_dict(content_dict)
- def eval_ruby_expression(self, expr: str) -> Any:
+ def eval_ruby_expression(self, expr: str) -> Optional[Union[str, List[str]]]:
"""Very simple evaluator of Ruby expressions.
>>> GemspecMapping().eval_ruby_expression('"Foo bar"')
@@ -101,6 +106,8 @@
return None
if isinstance(tree, ast.Expression):
return evaluator(tree.body)
+ else:
+ return None
def normalize_homepage(self, s: str) -> Dict[str, str]:
if isinstance(s, str):
@@ -110,19 +117,22 @@
if isinstance(s, str):
return [{"@id": "https://spdx.org/licenses/" + s}]
- def normalize_licenses(self, licenses: List[str]) -> Any:
+ def normalize_licenses(self, licenses: List[str]) -> List[Dict[str, str]]:
if isinstance(licenses, list):
return [
{"@id": "https://spdx.org/licenses/" + license}
for license in licenses
if isinstance(license, str)
]
+ return []
- def normalize_author(self, author: str) -> Any:
+ def normalize_author(self, author: str) -> Dict[str, List[Dict[str, str]]]:
if isinstance(author, str):
return {"@list": [name_to_person(author)]}
- def normalize_authors(self, authors: List[str]) -> Any:
+ def normalize_authors(
+ self, authors: List[str]
+ ) -> Optional[Dict[str, List[Dict[str, str]]]]:
if isinstance(authors, list):
return {
"@list": [
@@ -131,3 +141,5 @@
if isinstance(author, str)
]
}
+ else:
+ return None

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 5:48 AM (8 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225976

Event Timeline