Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/metadata_dictionary/python.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import email.parser | import email.parser | ||||
import email.policy | import email.policy | ||||
import itertools | import itertools | ||||
from typing import Any, Dict, List | |||||
from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI | ||||
from .base import DictMapping, SingleFileMapping | from .base import DictMapping, SingleFileMapping | ||||
_normalize_pkginfo_key = str.lower | _normalize_pkginfo_key = str.lower | ||||
class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): | class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): | ||||
def header_fetch_parse(self, name, value): | def header_fetch_parse(self, name: str, value: str) -> str: | ||||
if hasattr(value, "name"): | if hasattr(value, "name"): | ||||
return value | return value | ||||
value = value.replace("\n ", "\n") | value = value.replace("\n ", "\n") | ||||
return self.header_factory(name, value) | return self.header_factory(name, value) # type: ignore | ||||
class PythonPkginfoMapping(DictMapping, SingleFileMapping): | class PythonPkginfoMapping(DictMapping, SingleFileMapping): | ||||
"""Dedicated class for Python's PKG-INFO mapping and translation. | """Dedicated class for Python's PKG-INFO mapping and translation. | ||||
https://www.python.org/dev/peps/pep-0314/""" | https://www.python.org/dev/peps/pep-0314/""" | ||||
name = "pkg-info" | name = "pkg-info" | ||||
filename = b"PKG-INFO" | filename = b"PKG-INFO" | ||||
mapping = { | mapping = { | ||||
_normalize_pkginfo_key(k): v | _normalize_pkginfo_key(k): v | ||||
for (k, v) in CROSSWALK_TABLE["Python PKG-INFO"].items() | for (k, v) in CROSSWALK_TABLE["Python PKG-INFO"].items() | ||||
} | } | ||||
string_fields = [ | string_fields = [ | ||||
"name", | "name", | ||||
"version", | "version", | ||||
"description", | "description", | ||||
"summary", | "summary", | ||||
"author", | "author", | ||||
"author-email", | "author-email", | ||||
] | ] | ||||
_parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) | _parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) | ||||
def translate(self, content): | def translate(self, content: bytes) -> Dict[str, Any]: | ||||
msg = self._parser.parsebytes(content) | msg = self._parser.parsebytes(content) | ||||
d = {} | d: Dict[str, List[str]] = {} | ||||
for (key, value) in msg.items(): | for (key, value) in msg.items(): | ||||
key = _normalize_pkginfo_key(key) | key = _normalize_pkginfo_key(key) | ||||
if value != "UNKNOWN": | if value != "UNKNOWN": | ||||
d.setdefault(key, []).append(value) | d.setdefault(key, []).append(value) | ||||
metadata = self._translate_dict(d, normalize=False) | metadata = self._translate_dict(d, normalize=False) | ||||
if SCHEMA_URI + "author" in metadata or SCHEMA_URI + "email" in metadata: | if SCHEMA_URI + "author" in metadata or SCHEMA_URI + "email" in metadata: | ||||
metadata[SCHEMA_URI + "author"] = { | metadata[SCHEMA_URI + "author"] = { | ||||
"@list": [ | "@list": [ | ||||
{ | { | ||||
"@type": SCHEMA_URI + "Person", | "@type": SCHEMA_URI + "Person", | ||||
SCHEMA_URI | SCHEMA_URI | ||||
+ "name": metadata.pop(SCHEMA_URI + "author", [None])[0], | + "name": metadata.pop(SCHEMA_URI + "author", [None])[0], | ||||
SCHEMA_URI | SCHEMA_URI | ||||
+ "email": metadata.pop(SCHEMA_URI + "email", [None])[0], | + "email": metadata.pop(SCHEMA_URI + "email", [None])[0], | ||||
} | } | ||||
] | ] | ||||
} | } | ||||
return self.normalize_translation(metadata) | return self.normalize_translation(metadata) | ||||
def normalize_home_page(self, urls): | def normalize_home_page(self, urls: List[str]) -> List[Dict[str, str]]: | ||||
return [{"@id": url} for url in urls] | return [{"@id": url} for url in urls] | ||||
def normalize_keywords(self, keywords): | def normalize_keywords(self, keywords: List[str]) -> List[str]: | ||||
return list(itertools.chain.from_iterable(s.split(" ") for s in keywords)) | return list(itertools.chain.from_iterable(s.split(" ") for s in keywords)) | ||||
def normalize_license(self, licenses): | def normalize_license(self, licenses: List[str]) -> List[Dict[str, str]]: | ||||
return [{"@id": license} for license in licenses] | return [{"@id": license} for license in licenses] |