Page MenuHomeSoftware Heritage

D7342.diff
No OneTemporary

D7342.diff

diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py
--- a/swh/indexer/metadata_dictionary/__init__.py
+++ b/swh/indexer/metadata_dictionary/__init__.py
@@ -1,10 +1,13 @@
import collections
+from typing import DefaultDict, Dict, Set, Type
import click
+from typing_extensions import Final
from . import cff, codemeta, maven, npm, python, ruby
+from .base import BaseMapping
-MAPPINGS = {
+MAPPINGS: Final[Dict[str, Type[BaseMapping]]] = {
"CodemetaMapping": codemeta.CodemetaMapping,
"MavenMapping": maven.MavenMapping,
"NpmMapping": npm.NpmMapping,
@@ -14,7 +17,7 @@
}
-def list_terms():
+def list_terms() -> DefaultDict[str, Set[Type[BaseMapping]]]:
"""Returns a dictionary with all supported CodeMeta terms as keys,
and the mappings that support each of them as values."""
d = collections.defaultdict(set)
@@ -27,7 +30,7 @@
@click.command()
@click.argument("mapping_name")
@click.argument("file_name")
-def main(mapping_name, file_name):
+def main(mapping_name: str, file_name: str) -> None:
from pprint import pprint
with open(file_name, "rb") as fd:
diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py
--- a/swh/indexer/metadata_dictionary/base.py
+++ b/swh/indexer/metadata_dictionary/base.py
@@ -5,11 +5,25 @@
import json
import logging
-from typing import List
+from typing import Dict, Iterable, List, Optional
+
+from typing_extensions import TypedDict
from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values
+class FileEntry(TypedDict):
+ name: bytes
+ sha1: bytes
+ sha1_git: bytes
+ target: bytes
+ length: int
+ status: str
+ type: str
+ perms: int
+ dir_id: bytes
+
+
class BaseMapping:
"""Base class for mappings to inherit from
@@ -19,35 +33,40 @@
- override translate function
"""
- def __init__(self, log_suffix=""):
+ def __init__(self, log_suffix: str = ""):
self.log_suffix = log_suffix
self.log = logging.getLogger(
"%s.%s" % (self.__class__.__module__, self.__class__.__name__)
)
@property
- def name(self):
+ def name(self) -> str:
"""A name of this mapping, used as an identifier in the
indexer storage."""
raise NotImplementedError(f"{self.__class__.__name__}.name")
@classmethod
- def detect_metadata_files(cls, files):
+ def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]:
"""
Detects files potentially containing metadata
Args:
- file_entries (list): list of files
+ file_entries: list of files
Returns:
list: list of sha1 (possibly empty)
"""
raise NotImplementedError(f"{cls.__name__}.detect_metadata_files")
- def translate(self, file_content):
+ @classmethod
+ def supported_terms(cls) -> Iterable[str]:
+ """Returns all CodeMeta terms this mapping supports"""
+ raise NotImplementedError(f"{cls.__name__}.supported_terms")
+
+ def translate(self, file_content: bytes) -> Optional[Dict]:
raise NotImplementedError(f"{self.__class__.__name__}.translate")
- def normalize_translation(self, metadata):
+ def normalize_translation(self, metadata: Dict) -> Dict:
return compact(metadata)
@@ -55,14 +74,14 @@
"""Base class for all mappings that use a single file as input."""
@property
- def filename(self):
+ def filename(self) -> bytes:
"""The .json file to extract metadata from."""
raise NotImplementedError(f"{self.__class__.__name__}.filename")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]:
for entry in file_entries:
- if entry["name"].lower() == cls.filename.lower():
+ if entry["name"].lower() == cls.filename.lower(): # type: ignore
return [entry["sha1"]]
return []
@@ -71,36 +90,36 @@
"""Base class for mappings that take as input a file that is mostly
a key-value store (eg. a shallow JSON dict)."""
- string_fields = [] # type: List[str]
+ string_fields: List[str] = []
"""List of fields that are simple strings, and don't need any
normalization."""
@property
- def mapping(self):
+ def mapping(self) -> Dict[str, str]:
"""A translation dict to map dict keys into a canonical name."""
raise NotImplementedError(f"{self.__class__.__name__}.mapping")
@staticmethod
- def _normalize_method_name(name):
+ def _normalize_method_name(name: str) -> str:
return name.replace("-", "_")
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> Iterable[str]:
return {
term
- for (key, term) in cls.mapping.items()
+ for (key, term) in cls.mapping.items() # type: ignore
if key in cls.string_fields
or hasattr(cls, "translate_" + cls._normalize_method_name(key))
or hasattr(cls, "normalize_" + cls._normalize_method_name(key))
}
- def _translate_dict(self, content_dict, *, normalize=True):
+ def _translate_dict(self, content_dict: Dict, *, normalize: bool = True) -> Dict:
"""
Translates content by parsing content from a dict object
and translating with the appropriate mapping
Args:
- content_dict (dict): content dict to translate
+ content_dict: content dict to translate
Returns:
dict: translated metadata in json-friendly form needed for
@@ -150,13 +169,13 @@
class JsonMapping(DictMapping, SingleFileMapping):
"""Base class for all mappings that use a JSON file as input."""
- def translate(self, raw_content):
+ def translate(self, raw_content: bytes) -> Optional[Dict]:
"""
Translates content by parsing content from a bytestring containing
json data and translating with the appropriate mapping
Args:
- raw_content (bytes): raw content to translate
+ raw_content: raw content to translate
Returns:
dict: translated metadata in json-friendly form needed for
@@ -164,14 +183,15 @@
"""
try:
- raw_content = raw_content.decode()
+ content: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
try:
- content_dict = json.loads(raw_content)
+ content_dict = json.loads(content)
except json.JSONDecodeError:
self.log.warning("Error unjsoning from %s", self.log_suffix)
- return
+ return None
if isinstance(content_dict, dict):
return self._translate_dict(content_dict)
+ return None
diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py
--- a/swh/indexer/metadata_dictionary/cff.py
+++ b/swh/indexer/metadata_dictionary/cff.py
@@ -1,3 +1,5 @@
+from typing import Any, Dict, List, Optional
+
import yaml
from swh.indexer.codemeta import CODEMETA_CONTEXT_URL, CROSSWALK_TABLE, SCHEMA_URI
@@ -18,19 +20,19 @@
mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"]
string_fields = ["keywords", "license", "abstract", "version", "doi"]
- def translate(self, raw_content):
- raw_content = raw_content.decode()
- content_dict = yaml.load(raw_content, Loader=yaml.SafeLoader)
+ def translate(self, raw_content: bytes) -> Dict:
+ content: str = raw_content.decode()
+ content_dict = yaml.load(content, Loader=yaml.SafeLoader)
metadata = self._translate_dict(content_dict)
metadata["@context"] = CODEMETA_CONTEXT_URL
return metadata
- def normalize_authors(self, d):
- result = []
+ def normalize_authors(self, d) -> Dict[str, Any]:
+ result: List[Dict[str, Any]] = []
for author in d:
- author_data = {"@type": SCHEMA_URI + "Person"}
+ author_data: Dict[str, Any] = {"@type": SCHEMA_URI + "Person"}
if "orcid" in author:
author_data["@id"] = author["orcid"]
if "affiliation" in author:
@@ -45,21 +47,24 @@
result.append(author_data)
- result = {"@list": result}
- return result
+ return {"@list": result}
- def normalize_doi(self, s):
+ def normalize_doi(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": "https://doi.org/" + s}
+ return None
- def normalize_license(self, s):
+ def normalize_license(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
+ return None
- def normalize_repository_code(self, s):
+ def normalize_repository_code(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": s}
+ return None
- def normalize_date_released(self, s):
+ def normalize_date_released(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@value": s, "@type": SCHEMA_URI + "Date"}
+ return None
diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py
--- a/swh/indexer/metadata_dictionary/codemeta.py
+++ b/swh/indexer/metadata_dictionary/codemeta.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import json
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CODEMETA_TERMS, expand
@@ -20,10 +21,10 @@
string_fields = None
@classmethod
- def supported_terms(cls):
+ def supported_terms(cls) -> List[str]:
return [term for term in CODEMETA_TERMS if not term.startswith("@")]
- def translate(self, content):
+ def translate(self, content: bytes) -> Optional[Dict[str, Any]]:
try:
return self.normalize_translation(expand(json.loads(content.decode())))
except Exception:
diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py
--- a/swh/indexer/metadata_dictionary/maven.py
+++ b/swh/indexer/metadata_dictionary/maven.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import os
+from typing import Any, Dict, List, Optional
import xml.parsers.expat
import xmltodict
@@ -46,7 +47,7 @@
_default_repository = {"url": "https://repo.maven.apache.org/maven2/"}
- def parse_repositories(self, d):
+ def parse_repositories(self, d: Dict) -> Optional[List[Optional[Dict[str, Any]]]]:
"""https://maven.apache.org/pom.html#Repositories
>>> import xmltodict
@@ -75,11 +76,11 @@
results = []
return [res for res in results if res] or None
- def parse_repository(self, d, repo):
+ def parse_repository(self, d: Dict, repo) -> Optional[Dict[str, Any]]:
if not isinstance(repo, dict):
- return
+ return None
if repo.get("layout", "default") != "default":
- return # TODO ?
+ return None # TODO ?
url = repo.get("url")
group_id = d.get("groupId")
artifact_id = d.get("artifactId")
@@ -90,8 +91,9 @@
):
repo = os.path.join(url, *group_id.split("."), artifact_id)
return {"@id": repo}
+ return None
- def normalize_groupId(self, id_):
+ def normalize_groupId(self, id_) -> Optional[Dict[str, Any]]:
"""https://maven.apache.org/pom.html#Maven_Coordinates
>>> MavenMapping().normalize_groupId('org.example')
@@ -99,8 +101,9 @@
"""
if isinstance(id_, str):
return {"@id": id_}
+ return None
- def parse_licenses(self, d):
+ def parse_licenses(self, d) -> Optional[List[Dict[str, Any]]]:
"""https://maven.apache.org/pom.html#Licenses
>>> import xmltodict
@@ -148,12 +151,12 @@
licenses = d.get("licenses")
if not isinstance(licenses, dict):
- return
+ return None
licenses = licenses.get("license")
if isinstance(licenses, dict):
licenses = [licenses]
elif not isinstance(licenses, list):
- return
+ return None
return [
{"@id": license["url"]}
for license in licenses
diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py
--- a/swh/indexer/metadata_dictionary/npm.py
+++ b/swh/indexer/metadata_dictionary/npm.py
@@ -4,6 +4,7 @@
# See top-level LICENSE file for more information
import re
+from typing import Any, Dict, List, Optional
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -29,7 +30,7 @@
# 'bitbucket': 'https://bitbucket.org/',
}
- def normalize_repository(self, d):
+ def normalize_repository(self, d) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#repository
>>> NpmMapping().normalize_repository({
@@ -67,7 +68,7 @@
return {"@id": url}
- def normalize_bugs(self, d):
+ def normalize_bugs(self, d) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#bugs
>>> NpmMapping().normalize_bugs({
@@ -90,7 +91,7 @@
r"^ *" r"(?P<name>.*?)" r"( +<(?P<email>.*)>)?" r"( +\((?P<url>.*)\))?" r" *$"
)
- def normalize_author(self, d):
+ def normalize_author(self, d) -> Optional[Dict[str, Any]]:
"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors'
>>> from pprint import pprint
@@ -111,7 +112,7 @@
'http://schema.org/name': 'John Doe',
'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]}
""" # noqa
- author = {"@type": SCHEMA_URI + "Person"}
+ author: Dict[str, Any] = {"@type": SCHEMA_URI + "Person"}
if isinstance(d, dict):
name = d.get("name", None)
email = d.get("email", None)
@@ -133,7 +134,7 @@
author[SCHEMA_URI + "url"] = {"@id": url}
return {"@list": [author]}
- def normalize_license(self, s):
+ def normalize_license(self, s) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#license
>>> NpmMapping().normalize_license('MIT')
@@ -141,8 +142,9 @@
"""
if isinstance(s, str):
return {"@id": "https://spdx.org/licenses/" + s}
+ return None
- def normalize_homepage(self, s):
+ def normalize_homepage(self, s) -> Optional[Dict[str, str]]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_homepage('https://example.org/~john.doe')
@@ -150,8 +152,9 @@
"""
if isinstance(s, str):
return {"@id": s}
+ return None
- def normalize_keywords(self, lst):
+ def normalize_keywords(self, lst: List) -> Optional[List[str]]:
"""https://docs.npmjs.com/files/package.json#homepage
>>> NpmMapping().normalize_keywords(['foo', 'bar'])
@@ -159,3 +162,4 @@
"""
if isinstance(lst, list):
return [x for x in lst if isinstance(x, str)]
+ return None
diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py
--- a/swh/indexer/metadata_dictionary/python.py
+++ b/swh/indexer/metadata_dictionary/python.py
@@ -6,6 +6,7 @@
import email.parser
import email.policy
import itertools
+from typing import Dict, List
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
@@ -44,9 +45,9 @@
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy())
- def translate(self, content):
+ def translate(self, content: bytes) -> Dict:
msg = self._parser.parsebytes(content)
- d = {}
+ d: Dict[str, List[str]] = {}
for (key, value) in msg.items():
key = _normalize_pkginfo_key(key)
if value != "UNKNOWN":
@@ -66,11 +67,11 @@
}
return self.normalize_translation(metadata)
- def normalize_home_page(self, urls):
+ def normalize_home_page(self, urls: List[str]) -> List[Dict[str, str]]:
return [{"@id": url} for url in urls]
- def normalize_keywords(self, keywords):
+ def normalize_keywords(self, keywords: List[str]) -> List[str]:
return list(itertools.chain.from_iterable(s.split(" ") for s in keywords))
- def normalize_license(self, licenses):
+ def normalize_license(self, licenses: str) -> List[Dict[str, str]]:
return [{"@id": license} for license in licenses]
diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py
--- a/swh/indexer/metadata_dictionary/ruby.py
+++ b/swh/indexer/metadata_dictionary/ruby.py
@@ -6,13 +6,14 @@
import ast
import itertools
import re
+from typing import Any, Dict, List, Optional, Union
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI
-from .base import DictMapping
+from .base import DictMapping, FileEntry
-def name_to_person(name):
+def name_to_person(name: str) -> Dict[str, str]:
return {
"@type": SCHEMA_URI + "Person",
SCHEMA_URI + "name": name,
@@ -28,29 +29,29 @@
_re_spec_entry = re.compile(r"\s*\w+\.(?P<key>\w+)\s*=\s*(?P<expr>.*)")
@classmethod
- def detect_metadata_files(cls, file_entries):
+ def detect_metadata_files(cls, file_entries: List[FileEntry]) -> List[bytes]:
for entry in file_entries:
if entry["name"].endswith(b".gemspec"):
return [entry["sha1"]]
return []
- def translate(self, raw_content):
+ def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]:
try:
- raw_content = raw_content.decode()
+ content: str = raw_content.decode()
except UnicodeDecodeError:
self.log.warning("Error unidecoding from %s", self.log_suffix)
- return
+ return None
# Skip lines before 'Gem::Specification.new'
lines = itertools.dropwhile(
- lambda x: not self._re_spec_new.match(x), raw_content.split("\n")
+ lambda x: not self._re_spec_new.match(x), content.split("\n")
)
try:
next(lines) # Consume 'Gem::Specification.new'
except StopIteration:
self.log.warning("Could not find Gem::Specification in %s", self.log_suffix)
- return
+ return None
content_dict = {}
for line in lines:
@@ -61,7 +62,7 @@
content_dict[match.group("key")] = value
return self._translate_dict(content_dict)
- def eval_ruby_expression(self, expr):
+ def eval_ruby_expression(self, expr: str) -> Optional[Union[str, List]]:
"""Very simple evaluator of Ruby expressions.
>>> GemspecMapping().eval_ruby_expression('"Foo bar"')
@@ -97,31 +98,36 @@
# of such strings).
tree = ast.parse(expr, mode="eval")
except (SyntaxError, ValueError):
- return
+ return None
if isinstance(tree, ast.Expression):
return evaluator(tree.body)
+ return None
- def normalize_homepage(self, s):
+ def normalize_homepage(self, s) -> Optional[Dict[str, str]]:
if isinstance(s, str):
return {"@id": s}
+ return None
- def normalize_license(self, s):
+ def normalize_license(self, s) -> Optional[List[Dict[str, str]]]:
if isinstance(s, str):
return [{"@id": "https://spdx.org/licenses/" + s}]
+ return None
- def normalize_licenses(self, licenses):
+ def normalize_licenses(self, licenses) -> Optional[List[Dict[str, str]]]:
if isinstance(licenses, list):
return [
{"@id": "https://spdx.org/licenses/" + license}
for license in licenses
if isinstance(license, str)
]
+ return None
- def normalize_author(self, author):
+ def normalize_author(self, author) -> Optional[Dict[str, Any]]:
if isinstance(author, str):
return {"@list": [name_to_person(author)]}
+ return None
- def normalize_authors(self, authors):
+ def normalize_authors(self, authors) -> Optional[Dict[str, List[Dict[str, Any]]]]:
if isinstance(authors, list):
return {
"@list": [
@@ -130,3 +136,4 @@
if isinstance(author, str)
]
}
+ return None

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 5:07 AM (8 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223291

Event Timeline