diff --git a/docs/metadata-workflow.rst b/docs/metadata-workflow.rst index f913f49..a5cb8e6 100644 --- a/docs/metadata-workflow.rst +++ b/docs/metadata-workflow.rst @@ -1,208 +1,210 @@ Metadata workflow ================= Intrinsic metadata ------------------ Indexing :term:`intrinsic metadata` requires extracting information from the lowest levels of the :ref:`Merkle DAG ` (directories, files, and content blobs) and associate them to the highest ones (origins). In order to deduplicate the work between origins, we split this work between multiple indexers, which coordinate with each other and save their results at each step in the indexer storage. Indexer architecture -------------------- .. thumbnail:: images/tasks-metadata-indexers.svg Origin-Head Indexer ___________________ First, the Origin-Head indexer gets called externally, with an origin as argument (or multiple origins, that are handled sequentially). For now, its tasks are scheduled manually via recurring Scheduler tasks; but in the near future, the :term:`journal` will be used to do that. It first looks up the last :term:`snapshot` and determines what the main branch of origin is (the "Head branch") and what revision it points to (the "Head"). Intrinsic metadata for that origin will be extracted from that revision. It schedules a Revision Metadata Indexer task for that revision, with a hint that the revision is the Head of that particular origin. Revision and Content Metadata Indexers ______________________________________ These two indexers do the hard part of the work. The Revision Metadata Indexer fetches the root directory associated with a revision, then extracts the metadata from that directory. To do so, it lists files in that directory, and looks for known names, such as :file:`codemeta.json`, :file:`package.json`, or :file:`pom.xml`. If there are any, it runs the Content Metadata Indexer on them, which in turn fetches their contents and runs them through extraction dictionaries/mappings. See below for details. Their results are saved in a database (the indexer storage), associated with the content and revision hashes. If it received a hint that this revision is the head of an origin, the Revision Metadata Indexer then schedules the Origin Metadata Indexer to run on that origin. Origin Metadata Indexer _______________________ The job of this indexer is very simple: it takes an origin identifier and a revision hash, and copies the metadata of the former to a new table, to associate it with the latter. The reason for this is to be able to perform searches on metadata, and efficiently find out which origins matched the pattern. Running that search on the ``revision_metadata`` table would require either a reverse lookup from revisions to origins, which is costly. Translation from language-specific metadata to CodeMeta ------------------------------------------------------- Intrinsic metadata are extracted from files provided with a project's source code, and translated using `CodeMeta`_'s `crosswalk table`_. All input formats supported so far are straightforward dictionaries (eg. JSON) or can be accessed as such (eg. XML); and the first part of the translation is to map their keys to a term in the CodeMeta vocabulary. This is done by parsing the crosswalk table's `CSV file`_ and using it as a map between these two vocabularies; and this does not require any format-specific code in the indexers. The second part is to normalize values. As language-specific metadata files each have their way(s) of formatting these values, we need to turn them into the data type required by CodeMeta. This normalization makes up for most of the code of :py:mod:`swh.indexer.metadata_dictionary`. .. _CodeMeta: https://codemeta.github.io/ .. _crosswalk table: https://codemeta.github.io/crosswalk/ .. _CSV file: https://github.com/codemeta/codemeta/blob/master/crosswalk.csv Supported intrinsic metadata ---------------------------- The following sources of intrinsic metadata are supported: * CodeMeta's `codemeta.json`_, * Maven's `pom.xml`_, * NPM's `package.json`_, * Python's `PKG-INFO`_, * Ruby's `.gemspec`_ .. _codemeta.json: https://codemeta.github.io/terms/ .. _pom.xml: https://maven.apache.org/pom.html .. _package.json: https://docs.npmjs.com/files/package.json .. _PKG-INFO: https://www.python.org/dev/peps/pep-0314/ .. _.gemspec: https://guides.rubygems.org/specification-reference/ Supported CodeMeta terms ------------------------ The following terms may be found in the output of the metadata translation (other than the `codemeta` mapping, which is the identity function, and therefore supports all terms): .. program-output:: python3 -m swh.indexer.cli mapping list-terms --exclude-mapping codemeta :nostderr: Adding support for additional ecosystem-specific metadata --------------------------------------------------------- This section will guide you through adding code to the metadata indexer to detect and translate new metadata formats. First, you should start by picking one of the `CodeMeta crosswalks`_. Then create a new file in :file:`swh-indexer/swh/indexer/metadata_dictionary/`, that will contain your code, and create a new class that inherits from helper classes, with some documentation about your indexer: .. code-block:: python - from .base import DictMapping, SingleFileMapping + from .base import DictMapping, SingleFileIntrinsicMapping from swh.indexer.codemeta import CROSSWALK_TABLE - class MyMapping(DictMapping, SingleFileMapping): + class MyMapping(DictMapping, SingleFileIntrinsicMapping): """Dedicated class for ...""" name = 'my-mapping' filename = b'the-filename' mapping = CROSSWALK_TABLE['Name of the CodeMeta crosswalk'] .. _CodeMeta crosswalks: https://github.com/codemeta/codemeta/tree/master/crosswalks +And reference it from :const:`swh.indexer.metadata_dictionary.INTRINSIC_MAPPINGS`. + Then, add a ``string_fields`` attribute, that is the list of all keys whose values are simple text values. For instance, to `translate Python PKG-INFO`_, it's: .. code-block:: python string_fields = ['name', 'version', 'description', 'summary', 'author', 'author-email'] These values will be automatically added to the above list of supported terms. .. _translate Python PKG-INFO: https://forge.softwareheritage.org/source/swh-indexer/browse/master/swh/indexer/metadata_dictionary/python.py Last step to get your code working: add a ``translate`` method that will take a single byte string as argument, turn it into a Python dictionary, whose keys are the ones of the input document, and pass it to ``_translate_dict``. For instance, if the input document is in JSON, it can be as simple as: .. code-block:: python def translate(self, raw_content): raw_content = raw_content.decode() # bytes to str content_dict = json.loads(raw_content) # str to dict return self._translate_dict(content_dict) # convert to CodeMeta ``_translate_dict`` will do the heavy work of reading the crosswalk table for each of ``string_fields``, read the corresponding value in the ``content_dict``, and build a CodeMeta dictionary with the corresponding names from the crosswalk table. One last thing to run your code: add it to the list in :file:`swh-indexer/swh/indexer/metadata_dictionary/__init__.py`, so the rest of the code is aware of it. Now, you can run it: .. code-block:: shell python3 -m swh.indexer.metadata_dictionary MyMapping path/to/input/file and it will (hopefully) returns a CodeMeta object. If it works, well done! You can now improve your translation code further, by adding methods that will do more advanced conversion. For example, if there is a field named ``license`` containing an SPDX identifier, you must convert it to an URI, like this: .. code-block:: python def normalize_license(self, s): if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} This method will automatically get called by ``_translate_dict`` when it finds a ``license`` field in ``content_dict``. diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index c5ad56f..76be504 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,536 +1,538 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, cast, ) from urllib.parse import urlparse import sentry_sdk from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ( BaseIndexer, ContentIndexer, DirectoryIndexer, ObjectsDict, OriginIndexer, ) from swh.indexer.metadata_detector import detect_metadata -from swh.indexer.metadata_dictionary import MAPPINGS +from swh.indexer.metadata_dictionary import EXTRINSIC_MAPPINGS, INTRINSIC_MAPPINGS from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.origin_head import get_head_swhid from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.model import hashutil from swh.model.model import Directory, MetadataAuthorityType from swh.model.model import ObjectType as ModelObjectType from swh.model.model import Origin, RawExtrinsicMetadata, Sha1Git from swh.model.swhids import CoreSWHID, ExtendedObjectType, ObjectType REVISION_GET_BATCH_SIZE = 10 RELEASE_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ExtrinsicMetadataIndexer( BaseIndexer[Sha1Git, RawExtrinsicMetadata, OriginExtrinsicMetadataRow] ): def process_journal_objects(self, objects: ObjectsDict) -> Dict: summary: Dict[str, Any] = {"status": "uneventful"} try: results = [] for item in objects.get("raw_extrinsic_metadata", []): results.extend( self.index(item["id"], data=RawExtrinsicMetadata.from_dict(item)) ) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index( self, id: Sha1Git, data: Optional[RawExtrinsicMetadata], **kwargs, ) -> List[OriginExtrinsicMetadataRow]: if data is None: raise NotImplementedError( "ExtrinsicMetadataIndexer.index() without RawExtrinsicMetadata data" ) if data.target.object_type != ExtendedObjectType.ORIGIN: # other types are not supported yet return [] if data.authority.type != MetadataAuthorityType.FORGE: # metadata provided by a third-party; don't trust it # (technically this could be handled below, but we check it here # to return early; sparing a translation and origin lookup) # TODO: add ways to define trusted authorities return [] metadata_items = [] mappings = [] - for (mapping_name, mapping) in MAPPINGS.items(): + for (mapping_name, mapping) in EXTRINSIC_MAPPINGS.items(): if data.format in mapping.extrinsic_metadata_formats(): metadata_item = mapping().translate(data.metadata) if metadata_item is not None: metadata_items.append(metadata_item) mappings.append(mapping_name) if not metadata_items: # Don't have any mapping to parse it, ignore return [] # TODO: batch requests to origin_get_by_sha1() origins = self.storage.origin_get_by_sha1([data.target.object_id]) try: (origin,) = origins if origin is None: raise ValueError() except ValueError: raise ValueError(f"Unknown origin {data.target}") from None if urlparse(data.authority.url).netloc != urlparse(origin["url"]).netloc: # metadata provided by a third-party; don't trust it # TODO: add ways to define trusted authorities return [] metadata = merge_documents(metadata_items) return [ OriginExtrinsicMetadataRow( id=origin["url"], indexer_configuration_id=self.tool["id"], from_remd_id=data.id, mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[OriginExtrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.origin_extrinsic_metadata_add(results) class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown directory", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) - metadata = MAPPINGS[mapping_name](log_suffix).translate(data) + metadata = INTRINSIC_MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): """Directory-level indexer This indexer is in charge of: - filtering directories already indexed in directory_intrinsic_metadata table with defined computation tool - retrieve all entry_files in directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for directory """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.directory_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Directory] = None, **kwargs ) -> List[DirectoryIntrinsicMetadataRow]: """Index directory by processing it and organizing result. use metadata_detector to iterate on filenames, passes them to the content indexers, then merges (if more than one) Args: id: sha1_git of the directory data: should always be None Returns: dict: dictionary representing a directory_intrinsic_metadata, with keys: - id: directory's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ dir_: List[DirectoryLsEntry] assert data is None, "Unexpected directory object" dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(id, recursive=False)), ) try: if [entry["type"] for entry in dir_] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_[0]["target"] dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(subdir, recursive=False)), ) files = [entry for entry in dir_ if entry["type"] == "file"] (mappings, metadata) = self.translate_directory_intrinsic_metadata( files, log_suffix="directory=%s" % hashutil.hash_to_hex(id), ) except Exception as e: self.log.exception("Problem when indexing dir: %r", e) sentry_sdk.capture_exception() return [] return [ DirectoryIntrinsicMetadataRow( id=id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[DirectoryIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" # TODO: add functions in storage to keep data in # directory_intrinsic_metadata return self.idx_storage.directory_intrinsic_metadata_add(results) def translate_directory_intrinsic_metadata( self, files: List[DirectoryLsEntry], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata in the given root directory Args: files: list of file entries, as returned by :meth:`swh.storage.interface.StorageInterface.directory_ls` Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] all_detected_files = detect_metadata(files) - used_mappings = [MAPPINGS[context].name for context in all_detected_files] + used_mappings = [ + INTRINSIC_MAPPINGS[context].name for context in all_detected_files + ] for (mapping_name, detected_files) in all_detected_files.items(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = mapping_name c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get(detected_files) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) def index_list( self, origins: List[Origin], check_origin_known: bool = True, **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: head_rev_ids = [] head_rel_ids = [] origin_heads: Dict[Origin, CoreSWHID] = {} # Filter out origins not in the storage if check_origin_known: known_origins = list( call_with_batches( self.storage.origin_get, [origin.url for origin in origins], ORIGIN_GET_BATCH_SIZE, ) ) else: known_origins = list(origins) for origin in known_origins: if origin is None: continue head_swhid = get_head_swhid(self.storage, origin.url) if head_swhid: origin_heads[origin] = head_swhid if head_swhid.object_type == ObjectType.REVISION: head_rev_ids.append(head_swhid.object_id) elif head_swhid.object_type == ObjectType.RELEASE: head_rel_ids.append(head_swhid.object_id) else: assert False, head_swhid head_revs = dict( zip( head_rev_ids, call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ), ) ) head_rels = dict( zip( head_rel_ids, call_with_batches( self.storage.release_get, head_rel_ids, RELEASE_GET_BATCH_SIZE ), ) ) results = [] for (origin, head_swhid) in origin_heads.items(): if head_swhid.object_type == ObjectType.REVISION: rev = head_revs[head_swhid.object_id] if not rev: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue directory_id = rev.directory elif head_swhid.object_type == ObjectType.RELEASE: rel = head_rels[head_swhid.object_id] if not rel: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue if rel.target_type != ModelObjectType.DIRECTORY: # TODO self.log.warning( "Head release %s of %r has unexpected target type %s", head_swhid, origin.url, rel.target_type, ) continue assert rel.target, rel directory_id = rel.target else: assert False, head_swhid for dir_metadata in self.directory_metadata_indexer.index(directory_id): # There is at most one dir_metadata orig_metadata = OriginIntrinsicMetadataRow( from_directory=dir_metadata.id, id=origin.url, metadata=dir_metadata.metadata, mappings=dir_metadata.mappings, indexer_configuration_id=dir_metadata.indexer_configuration_id, ) results.append((orig_metadata, dir_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate directories dir_metadata: List[DirectoryIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] summary: Dict = {} for (orig_item, dir_item) in results: assert dir_item.metadata == orig_item.metadata if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if dir_item not in dir_metadata: dir_metadata.append(dir_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if dir_metadata: summary_dir = self.idx_storage.directory_intrinsic_metadata_add( dir_metadata ) summary.update(summary_dir) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/metadata_detector.py b/swh/indexer/metadata_detector.py index e574b31..9482d0d 100644 --- a/swh/indexer/metadata_detector.py +++ b/swh/indexer/metadata_detector.py @@ -1,28 +1,28 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, List -from swh.indexer.metadata_dictionary import MAPPINGS +from swh.indexer.metadata_dictionary import INTRINSIC_MAPPINGS from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.storage.interface import Sha1 def detect_metadata(files: List[DirectoryLsEntry]) -> Dict[str, List[Sha1]]: """ Detects files potentially containing metadata Args: file_entries (list): list of files Returns: dict: {mapping_filenames[name]:f['sha1']} (may be empty) """ results = {} - for (mapping_name, mapping) in MAPPINGS.items(): + for (mapping_name, mapping) in INTRINSIC_MAPPINGS.items(): matches = mapping.detect_metadata_files(files) if matches: results[mapping_name] = matches return results diff --git a/swh/indexer/metadata_dictionary/__init__.py b/swh/indexer/metadata_dictionary/__init__.py index 4d4f4de..dc3ee6e 100644 --- a/swh/indexer/metadata_dictionary/__init__.py +++ b/swh/indexer/metadata_dictionary/__init__.py @@ -1,47 +1,55 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections +from typing import Dict, Type import click from . import cff, codemeta, composer, github, maven, npm, python, ruby +from .base import BaseExtrinsicMapping, BaseIntrinsicMapping, BaseMapping -MAPPINGS = { +INTRINSIC_MAPPINGS: Dict[str, Type[BaseIntrinsicMapping]] = { "CffMapping": cff.CffMapping, "CodemetaMapping": codemeta.CodemetaMapping, "GemspecMapping": ruby.GemspecMapping, - "GitHubMapping": github.GitHubMapping, "MavenMapping": maven.MavenMapping, "NpmMapping": npm.NpmMapping, "PythonPkginfoMapping": python.PythonPkginfoMapping, "ComposerMapping": composer.ComposerMapping, } +EXTRINSIC_MAPPINGS: Dict[str, Type[BaseExtrinsicMapping]] = { + "GitHubMapping": github.GitHubMapping, +} + + +MAPPINGS: Dict[str, Type[BaseMapping]] = {**INTRINSIC_MAPPINGS, **EXTRINSIC_MAPPINGS} + def list_terms(): """Returns a dictionary with all supported CodeMeta terms as keys, and the mappings that support each of them as values.""" d = collections.defaultdict(set) for mapping in MAPPINGS.values(): for term in mapping.supported_terms(): d[term].add(mapping) return d @click.command() @click.argument("mapping_name") @click.argument("file_name") def main(mapping_name: str, file_name: str): from pprint import pprint with open(file_name, "rb") as fd: file_content = fd.read() res = MAPPINGS[mapping_name]().translate(file_content) pprint(res) if __name__ == "__main__": main() diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 82b3daf..0f4da70 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,229 +1,240 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from typing_extensions import TypedDict from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] ) def produce_terms( namespace: str, terms: List[str] ) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(namespace + term for term in terms) # type: ignore return f return decorator class BaseMapping: - """Base class for mappings to inherit from - - To implement a new mapping: - - - inherit this class - - override translate function - """ + """Base class for :class:`BaseExtrinsicMapping` and :class:`BaseIntrinsicMapping`, + not to be inherited directly.""" def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") - @classmethod - def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: - """ - Returns the sha1 hashes of files which can be translated by this mapping - """ - raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") + def translate(self, file_content: bytes) -> Optional[Dict]: + """Translates metadata, from the content of a file or of a RawExtrinsicMetadata + object.""" + raise NotImplementedError(f"{self.__class__.__name__}.translate") + + def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: + return compact(metadata) + + +class BaseExtrinsicMapping(BaseMapping): + """Base class for extrinsic-metadata mappings to inherit from + + To implement a new mapping: + + - inherit this class + - override translate function + """ @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") - def translate(self, file_content: bytes) -> Optional[Dict]: - """Translates intrinsic metadata, from the content of a file.""" - raise NotImplementedError(f"{self.__class__.__name__}.translate") - def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: - return compact(metadata) +class BaseIntrinsicMapping(BaseMapping): + """Base class for intrinsic-metadata mappings to inherit from + + To implement a new mapping: + + - inherit this class + - override translate function + """ + + @classmethod + def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: + """ + Returns the sha1 hashes of files which can be translated by this mapping + """ + raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") -class SingleFileMapping(BaseMapping): +class SingleFileIntrinsicMapping(BaseIntrinsicMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] - @classmethod - def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: - # this class is only used by intrinsic metadata mappings - return () - class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { term for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/cff.py b/swh/indexer/metadata_dictionary/cff.py index 6f35511..48be831 100644 --- a/swh/indexer/metadata_dictionary/cff.py +++ b/swh/indexer/metadata_dictionary/cff.py @@ -1,74 +1,74 @@ from typing import Dict, List, Optional, Union import yaml from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import DictMapping, SingleFileMapping +from .base import DictMapping, SingleFileIntrinsicMapping class SafeLoader(yaml.SafeLoader): yaml_implicit_resolvers = { k: [r for r in v if r[0] != "tag:yaml.org,2002:timestamp"] for k, v in yaml.SafeLoader.yaml_implicit_resolvers.items() } -class CffMapping(DictMapping, SingleFileMapping): +class CffMapping(DictMapping, SingleFileIntrinsicMapping): """Dedicated class for Citation (CITATION.cff) mapping and translation""" name = "cff" filename = b"CITATION.cff" mapping = CROSSWALK_TABLE["Citation File Format Core (CFF-Core) 1.0.2"] string_fields = ["keywords", "license", "abstract", "version", "doi"] def translate(self, raw_content: bytes) -> Optional[Dict[str, str]]: raw_content_string: str = raw_content.decode() try: content_dict = yaml.load(raw_content_string, Loader=SafeLoader) except yaml.scanner.ScannerError: return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None def normalize_authors(self, d: List[dict]) -> Dict[str, list]: result = [] for author in d: author_data: Dict[str, Optional[Union[str, Dict]]] = { "@type": SCHEMA_URI + "Person" } if "orcid" in author and isinstance(author["orcid"], str): author_data["@id"] = author["orcid"] if "affiliation" in author and isinstance(author["affiliation"], str): author_data[SCHEMA_URI + "affiliation"] = { "@type": SCHEMA_URI + "Organization", SCHEMA_URI + "name": author["affiliation"], } if "family-names" in author and isinstance(author["family-names"], str): author_data[SCHEMA_URI + "familyName"] = author["family-names"] if "given-names" in author and isinstance(author["given-names"], str): author_data[SCHEMA_URI + "givenName"] = author["given-names"] result.append(author_data) result_final = {"@list": result} return result_final def normalize_doi(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://doi.org/" + s} def normalize_license(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_repository_code(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@id": s} def normalize_date_released(self, s: str) -> Dict[str, str]: if isinstance(s, str): return {"@value": s, "@type": SCHEMA_URI + "Date"} diff --git a/swh/indexer/metadata_dictionary/codemeta.py b/swh/indexer/metadata_dictionary/codemeta.py index 0bbb3fa..f0f0d09 100644 --- a/swh/indexer/metadata_dictionary/codemeta.py +++ b/swh/indexer/metadata_dictionary/codemeta.py @@ -1,31 +1,31 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from typing import Any, Dict, List, Optional from swh.indexer.codemeta import CODEMETA_TERMS, expand -from .base import SingleFileMapping +from .base import SingleFileIntrinsicMapping -class CodemetaMapping(SingleFileMapping): +class CodemetaMapping(SingleFileIntrinsicMapping): """ dedicated class for CodeMeta (codemeta.json) mapping and translation """ name = "codemeta" filename = b"codemeta.json" string_fields = None @classmethod def supported_terms(cls) -> List[str]: return [term for term in CODEMETA_TERMS if not term.startswith("@")] def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: return self.normalize_translation(expand(json.loads(content.decode()))) except Exception: return None diff --git a/swh/indexer/metadata_dictionary/composer.py b/swh/indexer/metadata_dictionary/composer.py index cd217df..c02f5d8 100644 --- a/swh/indexer/metadata_dictionary/composer.py +++ b/swh/indexer/metadata_dictionary/composer.py @@ -1,56 +1,56 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path from swh.indexer.codemeta import _DATA_DIR, SCHEMA_URI, _read_crosstable -from .base import JsonMapping, SingleFileMapping +from .base import JsonMapping, SingleFileIntrinsicMapping COMPOSER_TABLE_PATH = os.path.join(_DATA_DIR, "composer.csv") with open(COMPOSER_TABLE_PATH) as fd: (CODEMETA_TERMS, COMPOSER_TABLE) = _read_crosstable(fd) -class ComposerMapping(JsonMapping, SingleFileMapping): +class ComposerMapping(JsonMapping, SingleFileIntrinsicMapping): """Dedicated class for Packagist(composer.json) mapping and translation""" name = "composer" mapping = COMPOSER_TABLE["Composer"] filename = b"composer.json" string_fields = [ "name", "description", "version", "keywords", "homepage", "license", "author", "authors", ] def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_license(self, s): if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_authors(self, author_list): authors = [] for author in author_list: author_obj = {"@type": SCHEMA_URI + "Person"} if isinstance(author, dict): if isinstance(author.get("name", None), str): author_obj[SCHEMA_URI + "name"] = author.get("name", None) if isinstance(author.get("email", None), str): author_obj[SCHEMA_URI + "email"] = author.get("email", None) authors.append(author_obj) return {"@list": authors} diff --git a/swh/indexer/metadata_dictionary/github.py b/swh/indexer/metadata_dictionary/github.py index 6bebbd5..08e62d3 100644 --- a/swh/indexer/metadata_dictionary/github.py +++ b/swh/indexer/metadata_dictionary/github.py @@ -1,78 +1,74 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information + import json -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, Tuple from swh.indexer.codemeta import ACTIVITYSTREAMS_URI, CROSSWALK_TABLE, FORGEFED_URI -from swh.indexer.storage.interface import Sha1 -from .base import DirectoryLsEntry, JsonMapping, produce_terms +from .base import BaseExtrinsicMapping, JsonMapping, produce_terms def _prettyprint(d): print(json.dumps(d, indent=4)) -class GitHubMapping(JsonMapping): +class GitHubMapping(BaseExtrinsicMapping, JsonMapping): name = "github" mapping = CROSSWALK_TABLE["GitHub"] string_fields = [ "archive_url", "created_at", "updated_at", "description", "full_name", "html_url", "issues_url", ] - @classmethod - def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: - return [] - @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: return ("application/vnd.github.v3+json",) def _translate_dict(self, content_dict: Dict[str, Any], **kwargs) -> Dict[str, Any]: d = super()._translate_dict(content_dict, **kwargs) d["type"] = FORGEFED_URI + "Repository" return d @produce_terms(FORGEFED_URI, ["forks"]) @produce_terms(ACTIVITYSTREAMS_URI, ["totalItems"]) def translate_forks_count( self, translated_metadata: Dict[str, Any], v: Any ) -> None: """ >>> translated_metadata = {} >>> GitHubMapping().translate_forks_count(translated_metadata, 42) >>> _prettyprint(translated_metadata) { "https://forgefed.org/ns#forks": [ { "@type": "https://www.w3.org/ns/activitystreams#OrderedCollection", "https://www.w3.org/ns/activitystreams#totalItems": 42 } ] } """ if isinstance(v, int): translated_metadata.setdefault(FORGEFED_URI + "forks", []).append( { "@type": ACTIVITYSTREAMS_URI + "OrderedCollection", ACTIVITYSTREAMS_URI + "totalItems": v, } ) def normalize_license(self, d): """ >>> GitHubMapping().normalize_license({'spdx_id': 'MIT'}) {'@id': 'https://spdx.org/licenses/MIT'} """ if isinstance(d, dict) and isinstance(d.get("spdx_id"), str): return {"@id": "https://spdx.org/licenses/" + d["spdx_id"]} diff --git a/swh/indexer/metadata_dictionary/maven.py b/swh/indexer/metadata_dictionary/maven.py index ad4c5ed..419eb74 100644 --- a/swh/indexer/metadata_dictionary/maven.py +++ b/swh/indexer/metadata_dictionary/maven.py @@ -1,162 +1,162 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Any, Dict, Optional import xml.parsers.expat import xmltodict from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import DictMapping, SingleFileMapping +from .base import DictMapping, SingleFileIntrinsicMapping -class MavenMapping(DictMapping, SingleFileMapping): +class MavenMapping(DictMapping, SingleFileIntrinsicMapping): """ dedicated class for Maven (pom.xml) mapping and translation """ name = "maven" filename = b"pom.xml" mapping = CROSSWALK_TABLE["Java (Maven)"] string_fields = ["name", "version", "description", "email"] def translate(self, content: bytes) -> Optional[Dict[str, Any]]: try: d = xmltodict.parse(content).get("project") or {} except xml.parsers.expat.ExpatError: self.log.warning("Error parsing XML from %s", self.log_suffix) return None except UnicodeDecodeError: self.log.warning("Error unidecoding XML from %s", self.log_suffix) return None except (LookupError, ValueError): # unknown encoding or multi-byte encoding self.log.warning("Error detecting XML encoding from %s", self.log_suffix) return None if not isinstance(d, dict): self.log.warning("Skipping ill-formed XML content: %s", content) return None metadata = self._translate_dict(d, normalize=False) metadata[SCHEMA_URI + "codeRepository"] = self.parse_repositories(d) metadata[SCHEMA_URI + "license"] = self.parse_licenses(d) return self.normalize_translation(metadata) _default_repository = {"url": "https://repo.maven.apache.org/maven2/"} def parse_repositories(self, d): """https://maven.apache.org/pom.html#Repositories >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... codehausSnapshots ... Codehaus Snapshots ... http://snapshots.maven.codehaus.org/maven2 ... default ... ... ... ''') >>> MavenMapping().parse_repositories(d) """ repositories = d.get("repositories") if not repositories: results = [self.parse_repository(d, self._default_repository)] elif isinstance(repositories, dict): repositories = repositories.get("repository") or [] if not isinstance(repositories, list): repositories = [repositories] results = [self.parse_repository(d, repo) for repo in repositories] else: results = [] return [res for res in results if res] or None def parse_repository(self, d, repo): if not isinstance(repo, dict): return if repo.get("layout", "default") != "default": return # TODO ? url = repo.get("url") group_id = d.get("groupId") artifact_id = d.get("artifactId") if ( isinstance(url, str) and isinstance(group_id, str) and isinstance(artifact_id, str) ): repo = os.path.join(url, *group_id.split("."), artifact_id) return {"@id": repo} def normalize_groupId(self, id_): """https://maven.apache.org/pom.html#Maven_Coordinates >>> MavenMapping().normalize_groupId('org.example') {'@id': 'org.example'} """ if isinstance(id_, str): return {"@id": id_} def parse_licenses(self, d): """https://maven.apache.org/pom.html#Licenses >>> import xmltodict >>> import json >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... ''') >>> print(json.dumps(d, indent=4)) { "licenses": { "license": { "name": "Apache License, Version 2.0", "url": "https://www.apache.org/licenses/LICENSE-2.0.txt" } } } >>> MavenMapping().parse_licenses(d) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}] or, if there are more than one license: >>> import xmltodict >>> from pprint import pprint >>> d = xmltodict.parse(''' ... ... ... Apache License, Version 2.0 ... https://www.apache.org/licenses/LICENSE-2.0.txt ... ... ... MIT License ... https://opensource.org/licenses/MIT ... ... ... ''') >>> pprint(MavenMapping().parse_licenses(d)) [{'@id': 'https://www.apache.org/licenses/LICENSE-2.0.txt'}, {'@id': 'https://opensource.org/licenses/MIT'}] """ licenses = d.get("licenses") if not isinstance(licenses, dict): return licenses = licenses.get("license") if isinstance(licenses, dict): licenses = [licenses] elif not isinstance(licenses, list): return return [ {"@id": license["url"]} for license in licenses if isinstance(license, dict) and isinstance(license.get("url"), str) ] or None diff --git a/swh/indexer/metadata_dictionary/npm.py b/swh/indexer/metadata_dictionary/npm.py index 2b3916a..d05d355 100644 --- a/swh/indexer/metadata_dictionary/npm.py +++ b/swh/indexer/metadata_dictionary/npm.py @@ -1,228 +1,228 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import JsonMapping, SingleFileMapping +from .base import JsonMapping, SingleFileIntrinsicMapping -class NpmMapping(JsonMapping, SingleFileMapping): +class NpmMapping(JsonMapping, SingleFileIntrinsicMapping): """ dedicated class for NPM (package.json) mapping and translation """ name = "npm" mapping = CROSSWALK_TABLE["NodeJS"] filename = b"package.json" string_fields = ["name", "version", "homepage", "description", "email"] _schema_shortcuts = { "github": "git+https://github.com/%s.git", "gist": "git+https://gist.github.com/%s.git", "gitlab": "git+https://gitlab.com/%s.git", # Bitbucket supports both hg and git, and the shortcut does not # tell which one to use. # 'bitbucket': 'https://bitbucket.org/', } def normalize_repository(self, d): """https://docs.npmjs.com/files/package.json#repository >>> NpmMapping().normalize_repository({ ... 'type': 'git', ... 'url': 'https://example.org/foo.git' ... }) {'@id': 'git+https://example.org/foo.git'} >>> NpmMapping().normalize_repository( ... 'gitlab:foo/bar') {'@id': 'git+https://gitlab.com/foo/bar.git'} >>> NpmMapping().normalize_repository( ... 'foo/bar') {'@id': 'git+https://github.com/foo/bar.git'} """ if ( isinstance(d, dict) and isinstance(d.get("type"), str) and isinstance(d.get("url"), str) ): url = "{type}+{url}".format(**d) elif isinstance(d, str): if "://" in d: url = d elif ":" in d: (schema, rest) = d.split(":", 1) if schema in self._schema_shortcuts: url = self._schema_shortcuts[schema] % rest else: return None else: url = self._schema_shortcuts["github"] % d else: return None return {"@id": url} def normalize_bugs(self, d): """https://docs.npmjs.com/files/package.json#bugs >>> NpmMapping().normalize_bugs({ ... 'url': 'https://example.org/bugs/', ... 'email': 'bugs@example.org' ... }) {'@id': 'https://example.org/bugs/'} >>> NpmMapping().normalize_bugs( ... 'https://example.org/bugs/') {'@id': 'https://example.org/bugs/'} """ if isinstance(d, dict) and isinstance(d.get("url"), str): return {"@id": d["url"]} elif isinstance(d, str): return {"@id": d} else: return None _parse_author = re.compile( r"^ *" r"(?P.*?)" r"( +<(?P.*)>)?" r"( +\((?P.*)\))?" r" *$" ) def normalize_author(self, d): """https://docs.npmjs.com/files/package.json#people-fields-author-contributors' >>> from pprint import pprint >>> pprint(NpmMapping().normalize_author({ ... 'name': 'John Doe', ... 'email': 'john.doe@example.org', ... 'url': 'https://example.org/~john.doe', ... })) {'@list': [{'@type': 'http://schema.org/Person', 'http://schema.org/email': 'john.doe@example.org', 'http://schema.org/name': 'John Doe', 'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]} >>> pprint(NpmMapping().normalize_author( ... 'John Doe (https://example.org/~john.doe)' ... )) {'@list': [{'@type': 'http://schema.org/Person', 'http://schema.org/email': 'john.doe@example.org', 'http://schema.org/name': 'John Doe', 'http://schema.org/url': {'@id': 'https://example.org/~john.doe'}}]} """ # noqa author = {"@type": SCHEMA_URI + "Person"} if isinstance(d, dict): name = d.get("name", None) email = d.get("email", None) url = d.get("url", None) elif isinstance(d, str): match = self._parse_author.match(d) if not match: return None name = match.group("name") email = match.group("email") url = match.group("url") else: return None if name and isinstance(name, str): author[SCHEMA_URI + "name"] = name if email and isinstance(email, str): author[SCHEMA_URI + "email"] = email if url and isinstance(url, str): author[SCHEMA_URI + "url"] = {"@id": url} return {"@list": [author]} def normalize_description(self, description): r"""Try to re-decode ``description`` as UTF-16, as this is a somewhat common mistake that causes issues in the database because of null bytes in JSON. >>> NpmMapping().normalize_description("foo bar") 'foo bar' >>> NpmMapping().normalize_description( ... "\ufffd\ufffd#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 \x00" ... ) 'foo bar' >>> NpmMapping().normalize_description( ... "\ufffd\ufffd\x00#\x00 \x00f\x00o\x00o\x00 \x00b\x00a\x00r\x00\r\x00 " ... ) 'foo bar' >>> NpmMapping().normalize_description( ... # invalid UTF-16 and meaningless UTF-8: ... "\ufffd\ufffd\x00#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description( ... # ditto (ut looks like little-endian at first) ... "\ufffd\ufffd#\x00\x00\x00 \x00\x00\x00\x00f\x00\x00\x00\x00\x00" ... ) is None True >>> NpmMapping().normalize_description(None) is None True """ if not isinstance(description, str): return None # XXX: if this function ever need to support more cases, consider # switching to https://pypi.org/project/ftfy/ instead of adding more hacks if description.startswith("\ufffd\ufffd") and "\x00" in description: # 2 unicode replacement characters followed by '# ' encoded as UTF-16 # is a common mistake, which indicates a README.md was saved as UTF-16, # and some NPM tool opened it as UTF-8 and used the first line as # description. description_bytes = description.encode() # Strip the the two unicode replacement characters assert description_bytes.startswith(b"\xef\xbf\xbd\xef\xbf\xbd") description_bytes = description_bytes[6:] # If the following attempts fail to recover the description, discard it # entirely because the current indexer storage backend (postgresql) cannot # store zero bytes in JSON columns. description = None if not description_bytes.startswith(b"\x00"): # try UTF-16 little-endian (the most common) first try: description = description_bytes.decode("utf-16le") except UnicodeDecodeError: pass if description is None: # if it fails, try UTF-16 big-endian try: description = description_bytes.decode("utf-16be") except UnicodeDecodeError: pass if description: if description.startswith("# "): description = description[2:] return description.rstrip() return description def normalize_license(self, s): """https://docs.npmjs.com/files/package.json#license >>> NpmMapping().normalize_license('MIT') {'@id': 'https://spdx.org/licenses/MIT'} """ if isinstance(s, str): return {"@id": "https://spdx.org/licenses/" + s} def normalize_homepage(self, s): """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_homepage('https://example.org/~john.doe') {'@id': 'https://example.org/~john.doe'} """ if isinstance(s, str): return {"@id": s} def normalize_keywords(self, lst): """https://docs.npmjs.com/files/package.json#homepage >>> NpmMapping().normalize_keywords(['foo', 'bar']) ['foo', 'bar'] """ if isinstance(lst, list): return [x for x in lst if isinstance(x, str)] diff --git a/swh/indexer/metadata_dictionary/python.py b/swh/indexer/metadata_dictionary/python.py index b308294..686deed 100644 --- a/swh/indexer/metadata_dictionary/python.py +++ b/swh/indexer/metadata_dictionary/python.py @@ -1,76 +1,76 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import email.parser import email.policy import itertools from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI -from .base import DictMapping, SingleFileMapping +from .base import DictMapping, SingleFileIntrinsicMapping _normalize_pkginfo_key = str.lower class LinebreakPreservingEmailPolicy(email.policy.EmailPolicy): def header_fetch_parse(self, name, value): if hasattr(value, "name"): return value value = value.replace("\n ", "\n") return self.header_factory(name, value) -class PythonPkginfoMapping(DictMapping, SingleFileMapping): +class PythonPkginfoMapping(DictMapping, SingleFileIntrinsicMapping): """Dedicated class for Python's PKG-INFO mapping and translation. https://www.python.org/dev/peps/pep-0314/""" name = "pkg-info" filename = b"PKG-INFO" mapping = { _normalize_pkginfo_key(k): v for (k, v) in CROSSWALK_TABLE["Python PKG-INFO"].items() } string_fields = [ "name", "version", "description", "summary", "author", "author-email", ] _parser = email.parser.BytesHeaderParser(policy=LinebreakPreservingEmailPolicy()) def translate(self, content): msg = self._parser.parsebytes(content) d = {} for (key, value) in msg.items(): key = _normalize_pkginfo_key(key) if value != "UNKNOWN": d.setdefault(key, []).append(value) metadata = self._translate_dict(d, normalize=False) if SCHEMA_URI + "author" in metadata or SCHEMA_URI + "email" in metadata: metadata[SCHEMA_URI + "author"] = { "@list": [ { "@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": metadata.pop(SCHEMA_URI + "author", [None])[0], SCHEMA_URI + "email": metadata.pop(SCHEMA_URI + "email", [None])[0], } ] } return self.normalize_translation(metadata) def normalize_home_page(self, urls): return [{"@id": url} for url in urls] def normalize_keywords(self, keywords): return list(itertools.chain.from_iterable(s.split(" ") for s in keywords)) def normalize_license(self, licenses): return [{"@id": license} for license in licenses] diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py index 55ce217..bdb06aa 100644 --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -1,140 +1,135 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import itertools import re -from typing import List, Tuple +from typing import List from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.storage.interface import Sha1 -from .base import DictMapping +from .base import BaseIntrinsicMapping, DictMapping def name_to_person(name): return { "@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": name, } -class GemspecMapping(DictMapping): +class GemspecMapping(BaseIntrinsicMapping, DictMapping): name = "gemspec" mapping = CROSSWALK_TABLE["Ruby Gem"] string_fields = ["name", "version", "description", "summary", "email"] _re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] - @classmethod - def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: - # this class is only used by intrinsic metadata mappings - return () - def translate(self, raw_content): try: raw_content = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( lambda x: not self._re_spec_new.match(x), raw_content.split("\n") ) try: next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) return content_dict = {} for line in lines: match = self._re_spec_entry.match(line) if match: value = self.eval_ruby_expression(match.group("expr")) if value: content_dict[match.group("key")] = value return self._translate_dict(content_dict) def eval_ruby_expression(self, expr): """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') 'Foo bar' >>> GemspecMapping().eval_ruby_expression("'Foo bar'") 'Foo bar' >>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") ['Foo', 'bar'] >>> GemspecMapping().eval_ruby_expression("'Foo bar'.freeze") 'Foo bar' >>> GemspecMapping().eval_ruby_expression( \ "['Foo'.freeze, 'bar'.freeze]") ['Foo', 'bar'] """ def evaluator(node): if isinstance(node, ast.Str): return node.s elif isinstance(node, ast.List): res = [] for element in node.elts: val = evaluator(element) if not val: return res.append(val) return res expr = expr.replace(".freeze", "") try: # We're parsing Ruby expressions here, but Python's # ast.parse works for very simple Ruby expressions # (mainly strings delimited with " or ', and lists # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): return if isinstance(tree, ast.Expression): return evaluator(tree.body) def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_license(self, s): if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] def normalize_licenses(self, licenses): if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} for license in licenses if isinstance(license, str) ] def normalize_author(self, author): if isinstance(author, str): return {"@list": [name_to_person(author)]} def normalize_authors(self, authors): if isinstance(authors, list): return { "@list": [ name_to_person(author) for author in authors if isinstance(author, str) ] }