diff --git a/swh/indexer/codemeta.py b/swh/indexer/codemeta.py index 6c4ef58..022c9bd 100644 --- a/swh/indexer/codemeta.py +++ b/swh/indexer/codemeta.py @@ -1,220 +1,217 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import csv import itertools import json import os.path import re from typing import Any, List from pyld import jsonld import swh.indexer +from swh.indexer.namespaces import ACTIVITYSTREAMS, CODEMETA, FORGEFED, SCHEMA _DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), "data") CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, "codemeta", "crosswalk.csv") CODEMETA_CONTEXT_PATH = os.path.join(_DATA_DIR, "codemeta", "codemeta.jsonld") with open(CODEMETA_CONTEXT_PATH) as fd: CODEMETA_CONTEXT = json.load(fd) _EMPTY_PROCESSED_CONTEXT: Any = {"mappings": {}} _PROCESSED_CODEMETA_CONTEXT = jsonld.JsonLdProcessor().process_context( _EMPTY_PROCESSED_CONTEXT, CODEMETA_CONTEXT, None ) CODEMETA_CONTEXT_URL = "https://doi.org/10.5063/schema/codemeta-2.0" CODEMETA_ALTERNATE_CONTEXT_URLS = { ("https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld") } -CODEMETA_URI = "https://codemeta.github.io/terms/" -SCHEMA_URI = "http://schema.org/" -FORGEFED_URI = "https://forgefed.org/ns#" -ACTIVITYSTREAMS_URI = "https://www.w3.org/ns/activitystreams#" PROPERTY_BLACKLIST = { # CodeMeta properties that we cannot properly represent. - SCHEMA_URI + "softwareRequirements", - CODEMETA_URI + "softwareSuggestions", + SCHEMA.softwareRequirements, + CODEMETA.softwareSuggestions, # Duplicate of 'author' - SCHEMA_URI + "creator", + SCHEMA.creator, } _codemeta_field_separator = re.compile(r"\s*[,/]\s*") def make_absolute_uri(local_name): """Parses codemeta.jsonld, and returns the @id of terms it defines. >>> make_absolute_uri("name") 'http://schema.org/name' >>> make_absolute_uri("downloadUrl") 'http://schema.org/downloadUrl' >>> make_absolute_uri("referencePublication") 'https://codemeta.github.io/terms/referencePublication' """ uri = jsonld.JsonLdProcessor.get_context_value( _PROCESSED_CODEMETA_CONTEXT, local_name, "@id" ) - assert uri.startswith(("@", CODEMETA_URI, SCHEMA_URI)), (local_name, uri) + assert uri.startswith(("@", CODEMETA._uri, SCHEMA._uri)), (local_name, uri) return uri def _read_crosstable(fd): reader = csv.reader(fd) try: header = next(reader) except StopIteration: raise ValueError("empty file") data_sources = set(header) - {"Parent Type", "Property", "Type", "Description"} codemeta_translation = {data_source: {} for data_source in data_sources} terms = set() for line in reader: # For each canonical name local_name = dict(zip(header, line))["Property"] if not local_name: continue canonical_name = make_absolute_uri(local_name) if canonical_name in PROPERTY_BLACKLIST: continue terms.add(canonical_name) for (col, value) in zip(header, line): # For each cell in the row if col in data_sources: # If that's not the parentType/property/type/description for local_name in _codemeta_field_separator.split(value): # For each of the data source's properties that maps # to this canonical name if local_name.strip(): codemeta_translation[col][local_name.strip()] = canonical_name return (terms, codemeta_translation) with open(CROSSWALK_TABLE_PATH) as fd: (CODEMETA_TERMS, CROSSWALK_TABLE) = _read_crosstable(fd) def _document_loader(url, options=None): """Document loader for pyld. Reads the local codemeta.jsonld file instead of fetching it from the Internet every single time.""" if url == CODEMETA_CONTEXT_URL or url in CODEMETA_ALTERNATE_CONTEXT_URLS: return { "contextUrl": None, "documentUrl": url, "document": CODEMETA_CONTEXT, } - elif url == CODEMETA_URI: + elif url == CODEMETA._uri: raise Exception( "{} is CodeMeta's URI, use {} as context url".format( - CODEMETA_URI, CODEMETA_CONTEXT_URL + CODEMETA._uri, CODEMETA_CONTEXT_URL ) ) else: raise Exception(url) def compact(doc, forgefed: bool): """Same as `pyld.jsonld.compact`, but in the context of CodeMeta. Args: forgefed: Whether to add ForgeFed and ActivityStreams as compact URIs. This is typically used for extrinsic metadata documents, which frequently use properties from these namespaces. """ contexts: List[Any] = [CODEMETA_CONTEXT_URL] if forgefed: - contexts.append({"as": ACTIVITYSTREAMS_URI, "forge": FORGEFED_URI}) + contexts.append({"as": ACTIVITYSTREAMS._uri, "forge": FORGEFED._uri}) return jsonld.compact(doc, contexts, options={"documentLoader": _document_loader}) def expand(doc): """Same as `pyld.jsonld.expand`, but in the context of CodeMeta.""" return jsonld.expand(doc, options={"documentLoader": _document_loader}) def merge_values(v1, v2): """If v1 and v2 are of the form `{"@list": l1}` and `{"@list": l2}`, returns `{"@list": l1 + l2}`. Otherwise, make them lists (if they are not already) and concatenate them. >>> merge_values('a', 'b') ['a', 'b'] >>> merge_values(['a', 'b'], 'c') ['a', 'b', 'c'] >>> merge_values({'@list': ['a', 'b']}, {'@list': ['c']}) {'@list': ['a', 'b', 'c']} """ if v1 is None: return v2 elif v2 is None: return v1 elif isinstance(v1, dict) and set(v1) == {"@list"}: assert isinstance(v1["@list"], list) if isinstance(v2, dict) and set(v2) == {"@list"}: assert isinstance(v2["@list"], list) return {"@list": v1["@list"] + v2["@list"]} else: raise ValueError("Cannot merge %r and %r" % (v1, v2)) else: if isinstance(v2, dict) and "@list" in v2: raise ValueError("Cannot merge %r and %r" % (v1, v2)) if not isinstance(v1, list): v1 = [v1] if not isinstance(v2, list): v2 = [v2] return v1 + v2 def merge_documents(documents): """Takes a list of metadata dicts, each generated from a different metadata file, and merges them. Removes duplicates, if any.""" documents = list(itertools.chain.from_iterable(map(expand, documents))) merged_document = collections.defaultdict(list) for document in documents: for (key, values) in document.items(): if key == "@id": # @id does not get expanded to a list value = values # Only one @id is allowed, move it to sameAs if "@id" not in merged_document: merged_document["@id"] = value elif value != merged_document["@id"]: - if value not in merged_document[SCHEMA_URI + "sameAs"]: - merged_document[SCHEMA_URI + "sameAs"].append(value) + if value not in merged_document[SCHEMA.sameAs]: + merged_document[SCHEMA.sameAs].append(value) else: for value in values: if isinstance(value, dict) and set(value) == {"@list"}: # Value is of the form {'@list': [item1, item2]} # instead of the usual [item1, item2]. # We need to merge the inner lists (and mostly # preserve order). merged_value = merged_document.setdefault(key, {"@list": []}) for subvalue in value["@list"]: # merged_value must be of the form # {'@list': [item1, item2]}; as it is the same # type as value, which is an @list. if subvalue not in merged_value["@list"]: merged_value["@list"].append(subvalue) elif value not in merged_document[key]: merged_document[key].append(value) # XXX: we should set forgefed=True when merging extrinsic_metadata documents. # however, this function is only used to merge multiple files of the same # directory (which is only for intrinsic-metadata), so it is not an issue for now return compact(merged_document, forgefed=False) diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 2ac4adc..0cabaf4 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,270 +1,269 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from typing_extensions import TypedDict import yaml -from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values +from swh.indexer.codemeta import compact, merge_values +from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] ) -def produce_terms( - namespace: str, terms: List[str] -) -> Callable[[TTranslateCallable], TTranslateCallable]: +def produce_terms(*uris: str) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore - f.produced_terms.extend(namespace + term for term in terms) # type: ignore + f.produced_terms.extend(uris) # type: ignore return f return decorator class BaseMapping: """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") def translate(self, file_content: bytes) -> Optional[Dict]: """Translates metadata, from the content of a file or of a RawExtrinsicMetadata object.""" raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: raise NotImplementedError(f"{self.__class__.__name__}.normalize_translation") class BaseExtrinsicMapping(BaseMapping): """Base class for extrinsic_metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=True) class BaseIntrinsicMapping(BaseMapping): """Base class for intrinsic-metadata mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata, forgefed=False) class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { term for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ - translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} + translated_metadata = {"@type": SCHEMA.SoftwareSourceCode} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } class YamlMapping(DictMapping, SingleFileIntrinsicMapping): """Base class for all mappings that use Yaml data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.scanner.ScannerError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py index 286ec77..3869ece 100644 --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,53 +1,54 @@ from typing import Dict, List, Optional, Union -from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI +from swh.indexer.codemeta import CROSSWALK_TABLE +from swh.indexer.namespaces import SCHEMA from .base import YamlMapping class CffMapping(YamlMapping): """Dedicated class for Citation (CITATION.cff) mapping and translation""" name = "cff" filename = b"CITATION.cff" mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] def normalize_authors(self, d: List[dict]) -> Dict[str, list]: result = [] for author in d: author_data: Dict[str, Optional[Union[str, Dict]]] = { - "@type": SCHEMA_URI + "Person" + "@type": SCHEMA.Person } if "orcid" in author and isinstance(author["orcid"], str): author_data["@id"] = author["orcid"] if "affiliation" in author and isinstance(author["affiliation"], str): - author_data[SCHEMA_URI + "affiliation"] = { - "@type": SCHEMA_URI + "Organization", - SCHEMA_URI + "name": author["affiliation"], + author_data[SCHEMA.affiliation] = { + "@type": SCHEMA.Organization, + SCHEMA.name: author["affiliation"], } if "family-names" in author and isinstance(author["family-names"], str): - author_data[SCHEMA_URI + "familyName"] = author["family-names"] + author_data[SCHEMA.familyName] = author["family-names"] if "given-names" in author and isinstance(author["given-names"], str): - author_data[SCHEMA_URI + "givenName"] = author["given-names"] + author_data[SCHEMA.givenName] = author["given-names"] result.append(author_data) result_final = {"@list": result} return result_final def normalize_doi(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://doi.org/" + s} def normalize_license(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_repository_code(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": s} def normalize_date_released(self, s: str) -> Dict[str, str]: if isinstance(s, str): - return {"@value": s, "@type": SCHEMA_URI + "Date"} + return {"@value": s, "@type": SCHEMA.Date} diff --git a/swh/indexer/metadata_dictionary/composer.py b/swh/indexer/metadata_dictionary/composer.py index c02f5d8..7cfa8b7 100644 --- a/swh/indexer/metadata_dictionary/composer.py +++ b/swh/indexer/metadata_dictionary/composer.py @@ -1,56 +1,57 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path -from swh.indexer.codemeta import _DATA_DIR, SCHEMA_URI, _read_crosstable +from swh.indexer.codemeta import _DATA_DIR, _read_crosstable +from swh.indexer.namespaces import SCHEMA from .base import JsonMapping, SingleFileIntrinsicMapping COMPOSER_TABLE_PATH = os.path.join(_DATA_DIR, "composer.csv") with open(COMPOSER_TABLE_PATH) as fd: (CODEMETA_TERMS, COMPOSER_TABLE) = _read_crosstable(fd) class ComposerMapping(JsonMapping, SingleFileIntrinsicMapping): """Dedicated class for Packagist(composer.json) mapping and translation""" name = "composer" mapping = COMPOSER_TABLE["Composer"] filename = b"composer.json" string_fields = [ "name", "description", "version", "keywords", "homepage", "license", "author", "authors", ] def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_license(self, s): if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_authors(self, author_list): authors = [] for author in author_list: - author_obj = {"@type": SCHEMA_URI + "Person"} + author_obj = {"@type": SCHEMA.Person} if isinstance(author, dict): if isinstance(author.get("name", None), str): - author_obj[SCHEMA_URI + "name"] = author.get("name", None) + author_obj[SCHEMA.name] = author.get("name", None) if isinstance(author.get("email", None), str): - author_obj[SCHEMA_URI + "email"] = author.get("email", None) + author_obj[SCHEMA.email] = author.get("email", None) authors.append(author_obj) return {"@list": authors} diff --git a/swh/indexer/metadata_dictionary/dart.py b/swh/indexer/metadata_dictionary/dart.py index 26cd7d5..2c72459 100644 --- a/swh/indexer/metadata_dictionary/dart.py +++ b/swh/indexer/metadata_dictionary/dart.py @@ -1,74 +1,75 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import re -from swh.indexer.codemeta import _DATA_DIR, SCHEMA_URI, _read_crosstable +from swh.indexer.codemeta import _DATA_DIR, _read_crosstable +from swh.indexer.namespaces import SCHEMA from .base import YamlMapping PUB_TABLE_PATH = os.path.join(_DATA_DIR, "pubspec.csv") with open(PUB_TABLE_PATH) as fd: (CODEMETA_TERMS, PUB_TABLE) = _read_crosstable(fd) def name_to_person(name): return { - "@type": SCHEMA_URI + "Person", - SCHEMA_URI + "name": name, + "@type": SCHEMA.Person, + SCHEMA.name: name, } class PubspecMapping(YamlMapping): name = "pubspec" filename = b"pubspec.yaml" mapping = PUB_TABLE["Pubspec"] string_fields = [ "repository", "keywords", "description", "name", "homepage", "issue_tracker", "platforms", "license" # license will only be used with the SPDX Identifier ] def normalize_license(self, s): if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_author(self, s): name_email_regex = "(?P.*?)( <(?P.*)>)" - author = {"@type": SCHEMA_URI + "Person"} + author = {"@type": SCHEMA.Person} if isinstance(s, str): match = re.search(name_email_regex, s) if match: name = match.group("name") email = match.group("email") - author[SCHEMA_URI + "email"] = email + author[SCHEMA.email] = email else: name = s - author[SCHEMA_URI + "name"] = name + author[SCHEMA.name] = name return {"@list": [author]} def normalize_authors(self, authors_list): authors = {"@list": []} if isinstance(authors_list, list): for s in authors_list: author = self.normalize_author(s)["@list"] authors["@list"] += author return authors diff --git a/swh/indexer/metadata_dictionary/github.py b/swh/indexer/metadata_dictionary/github.py index 020c8d0..1dc67d5 100644 --- a/swh/indexer/metadata_dictionary/github.py +++ b/swh/indexer/metadata_dictionary/github.py @@ -1,130 +1,126 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, Tuple -from swh.indexer.codemeta import ACTIVITYSTREAMS_URI, CROSSWALK_TABLE, FORGEFED_URI +from swh.indexer.codemeta import CROSSWALK_TABLE +from swh.indexer.namespaces import ACTIVITYSTREAMS, FORGEFED from .base import BaseExtrinsicMapping, JsonMapping, produce_terms def _prettyprint(d): print(json.dumps(d, indent=4)) class GitHubMapping(BaseExtrinsicMapping, JsonMapping): name = "github" mapping = CROSSWALK_TABLE["GitHub"] string_fields = [ "archive_url", "created_at", "updated_at", "description", "full_name", "html_url", "issues_url", ] @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: return ("application/vnd.github.v3+json",) def _translate_dict(self, content_dict: Dict[str, Any], **kwargs) -> Dict[str, Any]: d = super()._translate_dict(content_dict, **kwargs) - d["type"] = FORGEFED_URI + "Repository" + d["type"] = FORGEFED.Repository return d - @produce_terms(FORGEFED_URI, ["forks"]) - @produce_terms(ACTIVITYSTREAMS_URI, ["totalItems"]) + @produce_terms(FORGEFED.forks, ACTIVITYSTREAMS.totalItems) def translate_forks_count( self, translated_metadata: Dict[str, Any], v: Any ) -> None: """ >>> translated_metadata = {} >>> GitHubMapping().translate_forks_count(translated_metadata, 42) >>> _prettyprint(translated_metadata) { "https://forgefed.org/ns#forks": [ { "@type": "https://www.w3.org/ns/activitystreams#OrderedCollection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } ] } """ if isinstance(v, int): - translated_metadata.setdefault(FORGEFED_URI + "forks", []).append( + translated_metadata.setdefault(FORGEFED.forks, []).append( { - "@type": ACTIVITYSTREAMS_URI + "OrderedCollection", - ACTIVITYSTREAMS_URI + "totalItems": v, + "@type": ACTIVITYSTREAMS.OrderedCollection, + ACTIVITYSTREAMS.totalItems: v, } ) - @produce_terms(ACTIVITYSTREAMS_URI, ["likes"]) - @produce_terms(ACTIVITYSTREAMS_URI, ["totalItems"]) + @produce_terms(ACTIVITYSTREAMS.likes, ACTIVITYSTREAMS.totalItems) def translate_stargazers_count( self, translated_metadata: Dict[str, Any], v: Any ) -> None: """ >>> translated_metadata = {} >>> GitHubMapping().translate_stargazers_count(translated_metadata, 42) >>> _prettyprint(translated_metadata) { "https://www.w3.org/ns/activitystreams#likes": [ { "@type": "https://www.w3.org/ns/activitystreams#Collection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } ] } """ if isinstance(v, int): - translated_metadata.setdefault(ACTIVITYSTREAMS_URI + "likes", []).append( + translated_metadata.setdefault(ACTIVITYSTREAMS.likes, []).append( { - "@type": ACTIVITYSTREAMS_URI + "Collection", - ACTIVITYSTREAMS_URI + "totalItems": v, + "@type": ACTIVITYSTREAMS.Collection, + ACTIVITYSTREAMS.totalItems: v, } ) - @produce_terms(ACTIVITYSTREAMS_URI, ["followers"]) - @produce_terms(ACTIVITYSTREAMS_URI, ["totalItems"]) + @produce_terms(ACTIVITYSTREAMS.followers, ACTIVITYSTREAMS.totalItems) def translate_watchers_count( self, translated_metadata: Dict[str, Any], v: Any ) -> None: """ >>> translated_metadata = {} >>> GitHubMapping().translate_watchers_count(translated_metadata, 42) >>> _prettyprint(translated_metadata) { "https://www.w3.org/ns/activitystreams#followers": [ { "@type": "https://www.w3.org/ns/activitystreams#Collection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } ] } """ if isinstance(v, int): - translated_metadata.setdefault( - ACTIVITYSTREAMS_URI + "followers", [] - ).append( + translated_metadata.setdefault(ACTIVITYSTREAMS.followers, []).append( { - "@type": ACTIVITYSTREAMS_URI + "Collection", - ACTIVITYSTREAMS_URI + "totalItems": v, + "@type": ACTIVITYSTREAMS.Collection, + ACTIVITYSTREAMS.totalItems: v, } ) def normalize_license(self, d): """ >>> GitHubMapping().normalize_license({'spdx_id': 'MIT'}) {'@id': 'https://spdx.org/licenses/MIT'} """ if isinstance(d, dict) and isinstance(d.get("spdx_id"), str): return {"@id": "https://spdx.org/licenses/" + d["spdx_id"]} diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py index 419eb74..b9df888 100644 --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -1,162 +1,163 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict, Optional import xml.parsers.expat import xmltodict -from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI +from swh.indexer.codemeta import CROSSWALK_TABLE +from swh.indexer.namespaces import SCHEMA from .base import DictMapping, SingleFileIntrinsicMapping class MavenMapping(DictMapping, SingleFileIntrinsicMapping): """ dedicated class for Maven (pom.xml) mapping and translation """ name = "maven" filename = b"pom.xml" mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: d = xmltodict.parse(content).get("project") or {} except xml.parsers.expat.ExpatError: self.log.warning("Error parsing XML from %s", self.log_suffix) return None except UnicodeDecodeError: self.log.warning("Error unidecoding XML from %s", self.log_suffix) return None except (LookupError, ValueError): # unknown encoding or multi-byte encoding self.log.warning("Error detecting XML encoding from %s", self.log_suffix) return None if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", content) return None metadata = self._translate_dict(d, normalize=False) - metadata[SCHEMA_URI + "codeRepository"] = self.parse_repositories(d) - metadata[SCHEMA_URI + "license"] = self.parse_licenses(d) + metadata[SCHEMA.codeRepository] = self.parse_repositories(d) + metadata[SCHEMA.license] = self.parse_licenses(d) return self.normalize_translation(metadata) _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} def parse_repositories(self, d): """https://maven.apache.org/pom.html#Repositories >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... codehausSnapshots ... Codehaus Snapshots ... http://snapshots.maven.codehaus.org/maven2 ... default ... ... ... ''') >>> MavenMapping().parse_repositories(d) """ repositories = d.get("repositories") if not repositories: results = [self.parse_repository(d, self._default_repository)] elif isinstance(repositories, dict): repositories = repositories.get("repository") or [] if not isinstance(repositories, list): repositories = [repositories] results = [self.parse_repository(d, repo) for repo in repositories] else: results = [] return [res for res in results if res] or None def parse_repository(self, d, repo): if not isinstance(repo, dict): return if repo.get("layout", "default") != "default": return # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") if ( isinstance(url, str) and isinstance(group_id, str) and isinstance(artifact_id, str) ): repo = os.path.join(url, *group_id.split("."), artifact_id) return {"@id": repo} def normalize_groupId(self, id_): """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') {'@id': 'org.example'} """ if isinstance(id_, str): return {"@id": id_} def parse_licenses(self, d): """https://maven.apache.org/pom.html#Licenses >>> import xmltodict >>> import json >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... ''') >>> print(json.dumps(d, indent=4)) { "licenses": { "license": { "name": "Apache License, Version 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.txt" } } } >>> MavenMapping().parse_licenses(d) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}] or, if there are more than one license: >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... MIT License ... https://opensource.org/licenses/MIT ... ... ... ''') >>> pprint(MavenMapping().parse_licenses(d)) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}, {'@id': 'https://opensource.org/licenses/MIT'}] """ licenses = d.get("licenses") if not isinstance(licenses, dict): return licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): return return [ {"@id": license["url"]} for license in licenses if isinstance(license, dict) and isinstance(license.get("url"), str) ] or None diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py index 00231dc..c3e0eef 100644 --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -1,243 +1,244 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import urllib.parse -from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI +from swh.indexer.codemeta import CROSSWALK_TABLE +from swh.indexer.namespaces import SCHEMA from .base import JsonMapping, SingleFileIntrinsicMapping class NpmMapping(JsonMapping, SingleFileIntrinsicMapping): """ dedicated class for NPM (package.json) mapping and translation """ name = "npm" mapping = CROSSWALK_TABLE["NodeJS"] filename = b"package.json" string_fields = ["name", "version", "homepage", "description", "email"] _schema_shortcuts = { "github": "git+https://github.com/%s.git", "gist": "git+https://gist.github.com/%s.git", "gitlab": "git+https://gitlab.com/%s.git", # Bitbucket supports both hg and git, and the shortcut does not # tell which one to use. # 'bitbucket': 'https://bitbucket.org/', } def normalize_repository(self, d): """https://docs.npmjs.com/files/package.json#repository >>> NpmMapping().normalize_repository({ ... 'type': 'git', ... 'url': 'https://example.org/foo.git' ... }) {'@id': 'git+https://example.org/foo.git'} >>> NpmMapping().normalize_repository( ... 'gitlab:foo/bar') {'@id': 'git+https://gitlab.com/foo/bar.git'} >>> NpmMapping().normalize_repository( ... 'foo/bar') {'@id': 'git+https://github.com/foo/bar.git'} """ if ( isinstance(d, dict) and isinstance(d.get("type"), str) and isinstance(d.get("url"), str) ): url = "{type}+{url}".format(**d) elif isinstance(d, str): if "://" in d: url = d elif ":" in d: (schema, rest) = d.split(":", 1) if schema in self._schema_shortcuts: url = self._schema_shortcuts[schema] % rest else: return None else: url = self._schema_shortcuts["github"] % d else: return None return {"@id": url} def normalize_bugs(self, d): """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ ... 'url': 'https://example.org/bugs/', ... 'email': 'bugs@example.org' ... }) {'@id': 'https://example.org/bugs/'} >>> NpmMapping().normalize_bugs( ... 'https://example.org/bugs/') {'@id': 'https://example.org/bugs/'} """ if isinstance(d, dict) and isinstance(d.get("url"), str): return {"@id": d["url"]} elif isinstance(d, str): return {"@id": d} else: return None _parse_author = re.compile( r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$" ) def normalize_author(self, d): r"""https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint >>> pprint(NpmMapping().normalize_author({ ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https://example.org/~john.doe', ... })) {'@list': [{'@type': 'http://schema.org/Person', 'http://schema.org/email': 'john.doe@example.org', 'http://schema.org/name': 'John Doe', 'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]} >>> pprint(NpmMapping().normalize_author( ... 'John Doe (https://example.org/~john.doe)' ... )) {'@list': [{'@type': 'http://schema.org/Person', 'http://schema.org/email': 'john.doe@example.org', 'http://schema.org/name': 'John Doe', 'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]} >>> pprint(NpmMapping().normalize_author({ ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https:\\\\example.invalid/~john.doe', ... })) {'@list': [{'@type': 'http://schema.org/Person', 'http://schema.org/email': 'john.doe@example.org', 'http://schema.org/name': 'John Doe'}]} """ # noqa - author = {"@type": SCHEMA_URI + "Person"} + author = {"@type": SCHEMA.Person} if isinstance(d, dict): name = d.get("name", None) email = d.get("email", None) url = d.get("url", None) elif isinstance(d, str): match = self._parse_author.match(d) if not match: return None name = match.group("name") email = match.group("email") url = match.group("url") else: return None if name and isinstance(name, str): - author[SCHEMA_URI + "name"] = name + author[SCHEMA.name] = name if email and isinstance(email, str): - author[SCHEMA_URI + "email"] = email + author[SCHEMA.email] = email if url and isinstance(url, str): # Workaround for https://github.com/digitalbazaar/pyld/issues/91 : drop # URLs that are blatantly invalid early, so PyLD does not crash. parsed_url = urllib.parse.urlparse(url) if parsed_url.netloc: - author[SCHEMA_URI + "url"] = {"@id": url} + author[SCHEMA.url] = {"@id": url} return {"@list": [author]} def normalize_description(self, description): r"""Try to re-decode ``description`` as UTF-16, as this is a somewhat common mistake that causes issues in the database because of null bytes in JSON. >>> NpmMapping().normalize_description("foo bar") 'foo bar' >>> NpmMapping().normalize_description( ... "\ufffd\ufffd#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 \x00" ... ) 'foo bar' >>> NpmMapping().normalize_description( ... "\ufffd\ufffd\x00#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 " ... ) 'foo bar' >>> NpmMapping().normalize_description( ... # invalid UTF-16 and meaningless UTF-8: ... "\ufffd\ufffd\x00#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description( ... # ditto (ut looks like little-endian at first) ... "\ufffd\ufffd#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description(None) is None True """ if not isinstance(description, str): return None # XXX: if this function ever need to support more cases, consider # switching to https://pypi.org/project/ftfy/ instead of adding more hacks if description.startswith("\ufffd\ufffd") and "\x00" in description: # 2 unicode replacement characters followed by '# ' encoded as UTF-16 # is a common mistake, which indicates a README.md was saved as UTF-16, # and some NPM tool opened it as UTF-8 and used the first line as # description. description_bytes = description.encode() # Strip the the two unicode replacement characters assert description_bytes.startswith(b"\xef\xbf\xbd\xef\xbf\xbd") description_bytes = description_bytes[6:] # If the following attempts fail to recover the description, discard it # entirely because the current indexer storage backend (postgresql) cannot # store zero bytes in JSON columns. description = None if not description_bytes.startswith(b"\x00"): # try UTF-16 little-endian (the most common) first try: description = description_bytes.decode("utf-16le") except UnicodeDecodeError: pass if description is None: # if it fails, try UTF-16 big-endian try: description = description_bytes.decode("utf-16be") except UnicodeDecodeError: pass if description: if description.startswith("# "): description = description[2:] return description.rstrip() return description def normalize_license(self, s): """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') {'@id': 'https://spdx.org/licenses/MIT'} """ if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_homepage(self, s): """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') {'@id': 'https://example.org/~john.doe'} """ if isinstance(s, str): return {"@id": s} def normalize_keywords(self, lst): """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) ['foo', 'bar'] """ if isinstance(lst, list): return [x for x in lst if isinstance(x, str)] diff --git a/swh/indexer/metadata_dictionary/nuget.py b/swh/indexer/metadata_dictionary/nuget.py index 7d50e73..4dd8b50 100644 --- a/swh/indexer/metadata_dictionary/nuget.py +++ b/swh/indexer/metadata_dictionary/nuget.py @@ -1,109 +1,109 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import re from typing import Any, Dict, List, Optional import xmltodict -from swh.indexer.codemeta import _DATA_DIR, SCHEMA_URI, _read_crosstable +from swh.indexer.codemeta import _DATA_DIR, _read_crosstable +from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 from .base import DictMapping, DirectoryLsEntry, SingleFileIntrinsicMapping NUGET_TABLE_PATH = os.path.join(_DATA_DIR, "nuget.csv") with open(NUGET_TABLE_PATH) as fd: (CODEMETA_TERMS, NUGET_TABLE) = _read_crosstable(fd) class NuGetMapping(DictMapping, SingleFileIntrinsicMapping): """ dedicated class for NuGet (.nuspec) mapping and translation """ name = "nuget" mapping = NUGET_TABLE["NuGet"] mapping["copyright"] = "http://schema.org/copyrightNotice" mapping["language"] = "http://schema.org/inLanguage" string_fields = [ "description", "version", "projectUrl", "name", "tags", "license", "licenseUrl", "summary", "copyright", "language", ] @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".nuspec"): return [entry["sha1"]] return [] def translate(self, content: bytes) -> Optional[Dict[str, Any]]: d = ( xmltodict.parse(content.strip(b" \n ")) .get("package", {}) .get("metadata", {}) ) if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", content) return None return self._translate_dict(d) def normalize_projectUrl(self, s): if isinstance(s, str): return {"@id": s} def translate_repository(self, translated_metadata, v): if isinstance(v, dict) and isinstance(v["@url"], str): codemeta_key = self.mapping["repository.url"] translated_metadata[codemeta_key] = {"@id": v["@url"]} def normalize_license(self, v): if isinstance(v, dict) and v["@type"] == "expression": license_string = v["#text"] if not bool( re.search(r" with |\(|\)| and ", license_string, re.IGNORECASE) ): return [ {"@id": "https://spdx.org/licenses/" + license_type.strip()} for license_type in re.split( r" or ", license_string, flags=re.IGNORECASE ) ] else: return None def normalize_licenseUrl(self, s): if isinstance(s, str): return {"@id": s} def normalize_authors(self, s): if isinstance(s, str): author_names = [a.strip() for a in s.split(",")] authors = [ - {"@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": name} - for name in author_names + {"@type": SCHEMA.Person, SCHEMA.name: name} for name in author_names ] return {"@list": authors} def translate_releaseNotes(self, translated_metadata, s): if isinstance(s, str): translated_metadata.setdefault("http://schema.org/releaseNotes", []).append( s ) def normalize_tags(self, s): if isinstance(s, str): return s.split(" ") diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py index 686deed..8a1b2ee 100644 --- a/swh/indexer/metadata_dictionary/python.py +++ b/swh/indexer/metadata_dictionary/python.py @@ -1,76 +1,75 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import email.parser import email.policy import itertools -from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI +from swh.indexer.codemeta import CROSSWALK_TABLE +from swh.indexer.namespaces import SCHEMA from .base import DictMapping, SingleFileIntrinsicMapping _normalize_pkginfo_key = str.lower class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): def header_fetch_parse(self, name, value): if hasattr(value, "name"): return value value = value.replace("\n ", "\n") return self.header_factory(name, value) class PythonPkginfoMapping(DictMapping, SingleFileIntrinsicMapping): """Dedicated class for Python's PKG-INFO mapping and translation. https://www.python.org/dev/peps/pep-0314/""" name = "pkg-info" filename = b"PKG-INFO" mapping = { _normalize_pkginfo_key(k): v for (k, v) in CROSSWALK_TABLE["Python PKG-INFO"].items() } string_fields = [ "name", "version", "description", "summary", "author", "author-email", ] _parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) def translate(self, content): msg = self._parser.parsebytes(content) d = {} for (key, value) in msg.items(): key = _normalize_pkginfo_key(key) if value != "UNKNOWN": d.setdefault(key, []).append(value) metadata = self._translate_dict(d, normalize=False) - if SCHEMA_URI + "author" in metadata or SCHEMA_URI + "email" in metadata: - metadata[SCHEMA_URI + "author"] = { + if SCHEMA.author in metadata or SCHEMA.email in metadata: + metadata[SCHEMA.author] = { "@list": [ { - "@type": SCHEMA_URI + "Person", - SCHEMA_URI - + "name": metadata.pop(SCHEMA_URI + "author", [None])[0], - SCHEMA_URI - + "email": metadata.pop(SCHEMA_URI + "email", [None])[0], + "@type": SCHEMA.Person, + SCHEMA.name: metadata.pop(SCHEMA.author, [None])[0], + SCHEMA.email: metadata.pop(SCHEMA.email, [None])[0], } ] } return self.normalize_translation(metadata) def normalize_home_page(self, urls): return [{"@id": url} for url in urls] def normalize_keywords(self, keywords): return list(itertools.chain.from_iterable(s.split(" ") for s in keywords)) def normalize_license(self, licenses): return [{"@id": license} for license in licenses] diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py index bdb06aa..246e69f 100644 --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -1,135 +1,136 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import itertools import re from typing import List -from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI +from swh.indexer.codemeta import CROSSWALK_TABLE from swh.indexer.metadata_dictionary.base import DirectoryLsEntry +from swh.indexer.namespaces import SCHEMA from swh.indexer.storage.interface import Sha1 from .base import BaseIntrinsicMapping, DictMapping def name_to_person(name): return { - "@type": SCHEMA_URI + "Person", - SCHEMA_URI + "name": name, + "@type": SCHEMA.Person, + SCHEMA.name: name, } class GemspecMapping(BaseIntrinsicMapping, DictMapping): name = "gemspec" mapping = CROSSWALK_TABLE["Ruby Gem"] string_fields = ["name", "version", "description", "summary", "email"] _re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] def translate(self, raw_content): try: raw_content = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( lambda x: not self._re_spec_new.match(x), raw_content.split("\n") ) try: next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) return content_dict = {} for line in lines: match = self._re_spec_entry.match(line) if match: value = self.eval_ruby_expression(match.group("expr")) if value: content_dict[match.group("key")] = value return self._translate_dict(content_dict) def eval_ruby_expression(self, expr): """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') 'Foo bar' >>> GemspecMapping().eval_ruby_expression("'Foo bar'") 'Foo bar' >>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") ['Foo', 'bar'] >>> GemspecMapping().eval_ruby_expression("'Foo bar'.freeze") 'Foo bar' >>> GemspecMapping().eval_ruby_expression( \ "['Foo'.freeze, 'bar'.freeze]") ['Foo', 'bar'] """ def evaluator(node): if isinstance(node, ast.Str): return node.s elif isinstance(node, ast.List): res = [] for element in node.elts: val = evaluator(element) if not val: return res.append(val) return res expr = expr.replace(".freeze", "") try: # We're parsing Ruby expressions here, but Python's # ast.parse works for very simple Ruby expressions # (mainly strings delimited with " or ', and lists # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): return if isinstance(tree, ast.Expression): return evaluator(tree.body) def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_license(self, s): if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] def normalize_licenses(self, licenses): if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} for license in licenses if isinstance(license, str) ] def normalize_author(self, author): if isinstance(author, str): return {"@list": [name_to_person(author)]} def normalize_authors(self, authors): if isinstance(authors, list): return { "@list": [ name_to_person(author) for author in authors if isinstance(author, str) ] } diff --git a/swh/indexer/namespaces.py b/swh/indexer/namespaces.py new file mode 100644 index 0000000..9267010 --- /dev/null +++ b/swh/indexer/namespaces.py @@ -0,0 +1,28 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +class _Namespace: + """Handy class to get terms within a namespace by accessing them as attributes. + + This is similar to `rdflib's namespaces + `__ + """ + + def __init__(self, uri: str): + if not uri.endswith(("#", "/")): + # Sanity check, to make sure it doesn't end with an alphanumerical + # character, which is very likely to be invalid. + raise ValueError(f"Invalid trailing character for namespace URI: {uri}") + self._uri = uri + + def __getattr__(self, term: str) -> str: + return self._uri + term + + +SCHEMA = _Namespace("http://schema.org/") +CODEMETA = _Namespace("https://codemeta.github.io/terms/") +FORGEFED = _Namespace("https://forgefed.org/ns#") +ACTIVITYSTREAMS = _Namespace("https://www.w3.org/ns/activitystreams#")