diff --git a/swh/indexer/codemeta.py b/swh/indexer/codemeta.py index b157232..1148f7e 100644 --- a/swh/indexer/codemeta.py +++ b/swh/indexer/codemeta.py @@ -1,204 +1,206 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import csv import itertools import json import os.path import re from pyld import jsonld import swh.indexer _DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), "data") CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, "codemeta", "crosswalk.csv") CODEMETA_CONTEXT_PATH = os.path.join(_DATA_DIR, "codemeta", "codemeta.jsonld") with open(CODEMETA_CONTEXT_PATH) as fd: CODEMETA_CONTEXT = json.load(fd) CODEMETA_CONTEXT_URL = "https://doi.org/10.5063/schema/codemeta-2.0" CODEMETA_ALTERNATE_CONTEXT_URLS = { ("https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld") } CODEMETA_URI = "https://codemeta.github.io/terms/" SCHEMA_URI = "http://schema.org/" +FORGEFED_URI = "https://forgefed.org/ns#" +ACTIVITYSTREAMS_URI = "https://www.w3.org/ns/activitystreams#" PROPERTY_BLACKLIST = { # CodeMeta properties that we cannot properly represent. SCHEMA_URI + "softwareRequirements", CODEMETA_URI + "softwareSuggestions", # Duplicate of 'author' SCHEMA_URI + "creator", } _codemeta_field_separator = re.compile(r"\s*[,/]\s*") def make_absolute_uri(local_name): definition = CODEMETA_CONTEXT["@context"][local_name] if isinstance(definition, str): return definition elif isinstance(definition, dict): prefixed_name = definition["@id"] (prefix, local_name) = prefixed_name.split(":") if prefix == "schema": canonical_name = SCHEMA_URI + local_name elif prefix == "codemeta": canonical_name = CODEMETA_URI + local_name else: assert False, prefix return canonical_name else: assert False, definition def _read_crosstable(fd): reader = csv.reader(fd) try: header = next(reader) except StopIteration: raise ValueError("empty file") data_sources = set(header) - {"Parent Type", "Property", "Type", "Description"} assert "codemeta-V1" in data_sources codemeta_translation = {data_source: {} for data_source in data_sources} terms = set() for line in reader: # For each canonical name local_name = dict(zip(header, line))["Property"] if not local_name: continue canonical_name = make_absolute_uri(local_name) if canonical_name in PROPERTY_BLACKLIST: continue terms.add(canonical_name) for (col, value) in zip(header, line): # For each cell in the row if col in data_sources: # If that's not the parentType/property/type/description for local_name in _codemeta_field_separator.split(value): # For each of the data source's properties that maps # to this canonical name if local_name.strip(): codemeta_translation[col][local_name.strip()] = canonical_name return (terms, codemeta_translation) with open(CROSSWALK_TABLE_PATH) as fd: (CODEMETA_TERMS, CROSSWALK_TABLE) = _read_crosstable(fd) def _document_loader(url, options=None): """Document loader for pyld. Reads the local codemeta.jsonld file instead of fetching it from the Internet every single time.""" if url == CODEMETA_CONTEXT_URL or url in CODEMETA_ALTERNATE_CONTEXT_URLS: return { "contextUrl": None, "documentUrl": url, "document": CODEMETA_CONTEXT, } elif url == CODEMETA_URI: raise Exception( "{} is CodeMeta's URI, use {} as context url".format( CODEMETA_URI, CODEMETA_CONTEXT_URL ) ) else: raise Exception(url) def compact(doc): """Same as `pyld.jsonld.compact`, but in the context of CodeMeta.""" return jsonld.compact( doc, CODEMETA_CONTEXT_URL, options={"documentLoader": _document_loader} ) def expand(doc): """Same as `pyld.jsonld.expand`, but in the context of CodeMeta.""" return jsonld.expand(doc, options={"documentLoader": _document_loader}) def merge_values(v1, v2): """If v1 and v2 are of the form `{"@list": l1}` and `{"@list": l2}`, returns `{"@list": l1 + l2}`. Otherwise, make them lists (if they are not already) and concatenate them. >>> merge_values('a', 'b') ['a', 'b'] >>> merge_values(['a', 'b'], 'c') ['a', 'b', 'c'] >>> merge_values({'@list': ['a', 'b']}, {'@list': ['c']}) {'@list': ['a', 'b', 'c']} """ if v1 is None: return v2 elif v2 is None: return v1 elif isinstance(v1, dict) and set(v1) == {"@list"}: assert isinstance(v1["@list"], list) if isinstance(v2, dict) and set(v2) == {"@list"}: assert isinstance(v2["@list"], list) return {"@list": v1["@list"] + v2["@list"]} else: raise ValueError("Cannot merge %r and %r" % (v1, v2)) else: if isinstance(v2, dict) and "@list" in v2: raise ValueError("Cannot merge %r and %r" % (v1, v2)) if not isinstance(v1, list): v1 = [v1] if not isinstance(v2, list): v2 = [v2] return v1 + v2 def merge_documents(documents): """Takes a list of metadata dicts, each generated from a different metadata file, and merges them. Removes duplicates, if any.""" documents = list(itertools.chain.from_iterable(map(expand, documents))) merged_document = collections.defaultdict(list) for document in documents: for (key, values) in document.items(): if key == "@id": # @id does not get expanded to a list value = values # Only one @id is allowed, move it to sameAs if "@id" not in merged_document: merged_document["@id"] = value elif value != merged_document["@id"]: if value not in merged_document[SCHEMA_URI + "sameAs"]: merged_document[SCHEMA_URI + "sameAs"].append(value) else: for value in values: if isinstance(value, dict) and set(value) == {"@list"}: # Value is of the form {'@list': [item1, item2]} # instead of the usual [item1, item2]. # We need to merge the inner lists (and mostly # preserve order). merged_value = merged_document.setdefault(key, {"@list": []}) for subvalue in value["@list"]: # merged_value must be of the form # {'@list': [item1, item2]}; as it is the same # type as value, which is an @list. if subvalue not in merged_value["@list"]: merged_value["@list"].append(subvalue) elif value not in merged_document[key]: merged_document[key].append(value) return compact(merged_document) diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 8a1c2f4..44343be 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,198 +1,228 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from typing_extensions import TypedDict from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str +TTranslateCallable = TypeVar( + "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] +) + + +def produce_terms( + namespace: str, terms: List[str] +) -> Callable[[TTranslateCallable], TTranslateCallable]: + """Returns a decorator that marks the decorated function as adding + the given terms to the ``translated_metadata`` dict""" + + def decorator(f: TTranslateCallable) -> TTranslateCallable: + if not hasattr(f, "produced_terms"): + f.produced_terms = [] # type: ignore + f.produced_terms.extend(namespace + term for term in terms) # type: ignore + return f + + return decorator + + class BaseMapping: """Base class for mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def translate(self, file_content: bytes) -> Optional[Dict]: raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata) class SingleFileMapping(BaseMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: # this class is only used by intrinsic metadata mappings return () class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): - return { + # one-to-one mapping from the original key to a CodeMeta term + simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) - or hasattr(cls, "translate_" + cls._normalize_method_name(key)) } + # more complex mapping from the original key to JSON-LD + complex_terms = { + term + for meth_name in dir(cls) + if meth_name.startswith("translate_") + for term in getattr(getattr(cls, meth_name), "produced_terms", []) + } + + return simple_terms | complex_terms + def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/github.py b/swh/indexer/metadata_dictionary/github.py index 1960c5d..2df751b 100644 --- a/swh/indexer/metadata_dictionary/github.py +++ b/swh/indexer/metadata_dictionary/github.py @@ -1,41 +1,73 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json -from typing import List, Tuple +from typing import Any, Dict, List, Tuple -from swh.indexer.codemeta import SCHEMA_URI +from swh.indexer.codemeta import ACTIVITYSTREAMS_URI, FORGEFED_URI, SCHEMA_URI from swh.indexer.storage.interface import Sha1 -from .base import DirectoryLsEntry, JsonMapping +from .base import DirectoryLsEntry, JsonMapping, produce_terms def _prettyprint(d): print(json.dumps(d, indent=4)) class GitHubMapping(JsonMapping): name = "github" mapping = { "name": SCHEMA_URI + "name", "license": SCHEMA_URI + "license", } string_fields = ["name"] @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: return [] @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: return ("application/vnd.github.v3+json",) + def _translate_dict(self, content_dict: Dict[str, Any], **kwargs) -> Dict[str, Any]: + d = super()._translate_dict(content_dict, **kwargs) + d["type"] = FORGEFED_URI + "Repository" + return d + + @produce_terms(FORGEFED_URI, ["forks"]) + @produce_terms(ACTIVITYSTREAMS_URI, ["totalItems"]) + def translate_forks_count( + self, translated_metadata: Dict[str, Any], v: Any + ) -> None: + """ + + >>> translated_metadata = {} + >>> GitHubMapping().translate_forks_count(translated_metadata, 42) + >>> _prettyprint(translated_metadata) + { + "https://forgefed.org/ns#forks": [ + { + "@type": "https://www.w3.org/ns/activitystreams#OrderedCollection", + "https://www.w3.org/ns/activitystreams#totalItems": 42 + } + ] + } + """ + if isinstance(v, int): + translated_metadata.setdefault(FORGEFED_URI + "forks", []).append( + { + "@type": ACTIVITYSTREAMS_URI + "OrderedCollection", + ACTIVITYSTREAMS_URI + "totalItems": v, + } + ) + def normalize_license(self, d): """ >>> GitHubMapping().normalize_license({'spdx_id': 'MIT'}) {'@id': 'https://spdx.org/licenses/MIT'} """ if isinstance(d, dict) and isinstance(d.get("spdx_id"), str): return {"@id": "https://spdx.org/licenses/" + d["spdx_id"]} diff --git a/swh/indexer/tests/metadata_dictionary/test_github.py b/swh/indexer/tests/metadata_dictionary/test_github.py index 4197fa0..d6739ec 100644 --- a/swh/indexer/tests/metadata_dictionary/test_github.py +++ b/swh/indexer/tests/metadata_dictionary/test_github.py @@ -1,113 +1,122 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.indexer.metadata_dictionary import MAPPINGS def test_compute_metadata_none(): """ testing content empty content is empty should return None """ content = b"" # None if no metadata was found or an error occurred declared_metadata = None result = MAPPINGS["GitHubMapping"]().translate(content) assert declared_metadata == result def test_supported_terms(): terms = MAPPINGS["GitHubMapping"].supported_terms() - assert {"http://schema.org/name", "http://schema.org/license"} <= terms + assert { + "http://schema.org/name", + "http://schema.org/license", + "https://forgefed.org/ns#forks", + "https://www.w3.org/ns/activitystreams#totalItems", + } <= terms def test_compute_metadata_github(): """ testing only computation of metadata with hard_mapping_npm """ content = b""" { "id": 80521091, "node_id": "MDEwOlJlcG9zaXRvcnk4MDUyMTA5MQ==", "name": "swh-indexer", "full_name": "SoftwareHeritage/swh-indexer", "private": false, "owner": { "login": "SoftwareHeritage", "id": 18555939, "node_id": "MDEyOk9yZ2FuaXphdGlvbjE4NTU1OTM5", "avatar_url": "https://avatars.githubusercontent.com/u/18555939?v=4", "gravatar_id": "", "url": "https://api.github.com/users/SoftwareHeritage", "type": "Organization", "site_admin": false }, "html_url": "https://github.com/SoftwareHeritage/swh-indexer", "description": "GitHub mirror of Metadata indexer", "fork": false, "url": "https://api.github.com/repos/SoftwareHeritage/swh-indexer", "created_at": "2017-01-31T13:05:39Z", "updated_at": "2022-06-22T08:02:20Z", "pushed_at": "2022-06-29T09:01:08Z", "git_url": "git://github.com/SoftwareHeritage/swh-indexer.git", "ssh_url": "git@github.com:SoftwareHeritage/swh-indexer.git", "clone_url": "https://github.com/SoftwareHeritage/swh-indexer.git", "svn_url": "https://github.com/SoftwareHeritage/swh-indexer", "homepage": "https://forge.softwareheritage.org/source/swh-indexer/", "size": 2713, "stargazers_count": 13, "watchers_count": 13, "language": "Python", "has_issues": false, "has_projects": false, "has_downloads": true, "has_wiki": false, "has_pages": false, "forks_count": 1, "mirror_url": null, "archived": false, "disabled": false, "open_issues_count": 0, "license": { "key": "gpl-3.0", "name": "GNU General Public License v3.0", "spdx_id": "GPL-3.0", "url": "https://api.github.com/licenses/gpl-3.0", "node_id": "MDc6TGljZW5zZTk=" }, "allow_forking": true, "is_template": false, "web_commit_signoff_required": false, "topics": [ ], "visibility": "public", "forks": 1, "open_issues": 0, "watchers": 13, "default_branch": "master", "temp_clone_token": null, "organization": { "login": "SoftwareHeritage", "id": 18555939, "node_id": "MDEyOk9yZ2FuaXphdGlvbjE4NTU1OTM5", "avatar_url": "https://avatars.githubusercontent.com/u/18555939?v=4", "gravatar_id": "", "type": "Organization", "site_admin": false }, "network_count": 1, "subscribers_count": 6 } """ result = MAPPINGS["GitHubMapping"]().translate(content) assert result == { "@context": "https://doi.org/10.5063/schema/codemeta-2.0", - "type": "SoftwareSourceCode", + "type": "https://forgefed.org/ns#Repository", + "https://forgefed.org/ns#forks": { + "https://www.w3.org/ns/activitystreams#totalItems": 1, + "type": "https://www.w3.org/ns/activitystreams#OrderedCollection", + }, "license": "https://spdx.org/licenses/GPL-3.0", "name": "swh-indexer", }