diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index ac0920b..eeecbdc 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,450 +1,452 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, + cast, ) import sentry_sdk from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ContentIndexer, DirectoryIndexer, OriginIndexer from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS +from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.origin_head import get_head_swhid from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, DirectoryIntrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.model import hashutil from swh.model.model import Directory from swh.model.model import ObjectType as ModelObjectType from swh.model.model import Origin, Sha1Git from swh.model.swhids import CoreSWHID, ObjectType REVISION_GET_BATCH_SIZE = 10 RELEASE_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown directory", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) metadata = MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - metadata (jsonb): detected metadata """ return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): """Directory-level indexer This indexer is in charge of: - filtering directories already indexed in directory_intrinsic_metadata table with defined computation tool - retrieve all entry_files in directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for directory """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.directory_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Directory] = None, **kwargs ) -> List[DirectoryIntrinsicMetadataRow]: """Index directory by processing it and organizing result. - use metadata_detector to iterate on filenames - - - if one filename detected -> sends file to content indexer - - if multiple file detected -> translation needed at directory level + use metadata_detector to iterate on filenames, passes them to the content + indexers, then merges (if more than one) Args: id: sha1_git of the directory - data: directory model object from storage + data: should always be None Returns: dict: dictionary representing a directory_intrinsic_metadata, with keys: - id: directory's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ - if data is None: - dir_ = list(self.storage.directory_ls(id, recursive=False)) - else: - assert isinstance(data, Directory) - dir_ = data.to_dict() + dir_: List[DirectoryLsEntry] + assert data is None, "Unexpected directory object" + dir_ = cast( + List[DirectoryLsEntry], + list(self.storage.directory_ls(id, recursive=False)), + ) try: if [entry["type"] for entry in dir_] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_[0]["target"] - dir_ = list(self.storage.directory_ls(subdir, recursive=False)) + dir_ = cast( + List[DirectoryLsEntry], + list(self.storage.directory_ls(subdir, recursive=False)), + ) files = [entry for entry in dir_ if entry["type"] == "file"] - detected_files = detect_metadata(files) (mappings, metadata) = self.translate_directory_intrinsic_metadata( - detected_files, + files, log_suffix="directory=%s" % hashutil.hash_to_hex(id), ) except Exception as e: self.log.exception("Problem when indexing dir: %r", e) sentry_sdk.capture_exception() + return [] return [ DirectoryIntrinsicMetadataRow( id=id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[DirectoryIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage. Args: results: list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes """ # TODO: add functions in storage to keep data in # directory_intrinsic_metadata return self.idx_storage.directory_intrinsic_metadata_add(results) def translate_directory_intrinsic_metadata( - self, detected_files: Dict[str, List[Any]], log_suffix: str + self, files: List[DirectoryLsEntry], log_suffix: str ) -> Tuple[List[Any], Any]: """ - Determine plan of action to translate metadata when containing - one or multiple detected files: + Determine plan of action to translate metadata in the given root directory Args: - detected_files: dictionary mapping context names (e.g., - "npm", "authors") to list of sha1 + files: list of file entries, as returned by + :meth:`swh.storage.interface.StorageInterface.directory_ls` Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ - used_mappings = [MAPPINGS[context].name for context in detected_files] metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] - for context in detected_files.keys(): + all_detected_files = detect_metadata(files) + used_mappings = [MAPPINGS[context].name for context in all_detected_files] + for (mapping_name, detected_files) in all_detected_files.items(): cfg = deepcopy(config) - cfg["tools"][0]["configuration"]["context"] = context + cfg["tools"][0]["configuration"]["context"] = mapping_name c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] - metadata_generator = self.idx_storage.content_metadata_get( - detected_files[context] - ) + metadata_generator = self.idx_storage.content_metadata_get(detected_files) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ - item for item in detected_files[context] if item not in sha1s_in_storage + item for item in detected_files if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) def index_list( self, origins: List[Origin], check_origin_known: bool = True, **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: head_rev_ids = [] head_rel_ids = [] origin_heads: Dict[Origin, CoreSWHID] = {} # Filter out origins not in the storage if check_origin_known: known_origins = list( call_with_batches( self.storage.origin_get, [origin.url for origin in origins], ORIGIN_GET_BATCH_SIZE, ) ) else: known_origins = list(origins) for origin in known_origins: if origin is None: continue head_swhid = get_head_swhid(self.storage, origin.url) if head_swhid: origin_heads[origin] = head_swhid if head_swhid.object_type == ObjectType.REVISION: head_rev_ids.append(head_swhid.object_id) elif head_swhid.object_type == ObjectType.RELEASE: head_rel_ids.append(head_swhid.object_id) else: assert False, head_swhid head_revs = dict( zip( head_rev_ids, call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ), ) ) head_rels = dict( zip( head_rel_ids, call_with_batches( self.storage.release_get, head_rel_ids, RELEASE_GET_BATCH_SIZE ), ) ) results = [] for (origin, head_swhid) in origin_heads.items(): if head_swhid.object_type == ObjectType.REVISION: rev = head_revs[head_swhid.object_id] if not rev: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue directory_id = rev.directory elif head_swhid.object_type == ObjectType.RELEASE: rel = head_rels[head_swhid.object_id] if not rel: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue if rel.target_type != ModelObjectType.DIRECTORY: # TODO self.log.warning( "Head release %s of %r has unexpected target type %s", head_swhid, origin.url, rel.target_type, ) continue assert rel.target, rel directory_id = rel.target else: assert False, head_swhid for dir_metadata in self.directory_metadata_indexer.index(directory_id): # There is at most one dir_metadata orig_metadata = OriginIntrinsicMetadataRow( from_directory=dir_metadata.id, id=origin.url, metadata=dir_metadata.metadata, mappings=dir_metadata.mappings, indexer_configuration_id=dir_metadata.indexer_configuration_id, ) results.append((orig_metadata, dir_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate directories dir_metadata: List[DirectoryIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] summary: Dict = {} for (orig_item, dir_item) in results: assert dir_item.metadata == orig_item.metadata if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if dir_item not in dir_metadata: dir_metadata.append(dir_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if dir_metadata: summary_dir = self.idx_storage.directory_intrinsic_metadata_add( dir_metadata ) summary.update(summary_dir) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/metadata_detector.py b/swh/indexer/metadata_detector.py index b8e99b5..e574b31 100644 --- a/swh/indexer/metadata_detector.py +++ b/swh/indexer/metadata_detector.py @@ -1,24 +1,28 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Dict, List + from swh.indexer.metadata_dictionary import MAPPINGS +from swh.indexer.metadata_dictionary.base import DirectoryLsEntry +from swh.indexer.storage.interface import Sha1 -def detect_metadata(files): +def detect_metadata(files: List[DirectoryLsEntry]) -> Dict[str, List[Sha1]]: """ Detects files potentially containing metadata Args: file_entries (list): list of files Returns: dict: {mapping_filenames[name]:f['sha1']} (may be empty) """ results = {} for (mapping_name, mapping) in MAPPINGS.items(): matches = mapping.detect_metadata_files(files) if matches: results[mapping_name] = matches return results diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 774875c..4169937 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,180 +1,184 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Dict, List, Optional +from typing_extensions import TypedDict + from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values +from swh.indexer.storage.interface import Sha1 + + +class DirectoryLsEntry(TypedDict): + target: Sha1 + sha1: Sha1 + name: bytes + type: str class BaseMapping: """Base class for mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod - def detect_metadata_files(cls, files: List[Dict[str, str]]) -> List[str]: + def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ - Detects files potentially containing metadata - - Args: - file_entries (list): list of files - - Returns: - list: list of sha1 (possibly empty) + Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") def translate(self, file_content: bytes) -> Optional[Dict]: raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata) class SingleFileMapping(BaseMapping): """Base class for all mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod - def detect_metadata_files(cls, file_entries: List[Dict[str, str]]) -> List[str]: + def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): return { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "translate_" + cls._normalize_method_name(key)) or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping, SingleFileMapping): """Base class for all mappings that use a JSON file as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py index ad86c06..6dc3205 100644 --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -1,132 +1,135 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import itertools import re +from typing import List from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI +from swh.indexer.metadata_dictionary.base import DirectoryLsEntry +from swh.indexer.storage.interface import Sha1 from .base import DictMapping def name_to_person(name): return { "@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": name, } class GemspecMapping(DictMapping): name = "gemspec" mapping = CROSSWALK_TABLE["Ruby Gem"] string_fields = ["name", "version", "description", "summary", "email"] _re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod - def detect_metadata_files(cls, file_entries): + def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] def translate(self, raw_content): try: raw_content = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( lambda x: not self._re_spec_new.match(x), raw_content.split("\n") ) try: next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) return content_dict = {} for line in lines: match = self._re_spec_entry.match(line) if match: value = self.eval_ruby_expression(match.group("expr")) if value: content_dict[match.group("key")] = value return self._translate_dict(content_dict) def eval_ruby_expression(self, expr): """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') 'Foo bar' >>> GemspecMapping().eval_ruby_expression("'Foo bar'") 'Foo bar' >>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") ['Foo', 'bar'] >>> GemspecMapping().eval_ruby_expression("'Foo bar'.freeze") 'Foo bar' >>> GemspecMapping().eval_ruby_expression( \ "['Foo'.freeze, 'bar'.freeze]") ['Foo', 'bar'] """ def evaluator(node): if isinstance(node, ast.Str): return node.s elif isinstance(node, ast.List): res = [] for element in node.elts: val = evaluator(element) if not val: return res.append(val) return res expr = expr.replace(".freeze", "") try: # We're parsing Ruby expressions here, but Python's # ast.parse works for very simple Ruby expressions # (mainly strings delimited with " or ', and lists # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): return if isinstance(tree, ast.Expression): return evaluator(tree.body) def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_license(self, s): if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] def normalize_licenses(self, licenses): if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} for license in licenses if isinstance(license, str) ] def normalize_author(self, author): if isinstance(author, str): return {"@list": [name_to_person(author)]} def normalize_authors(self, authors): if isinstance(authors, list): return { "@list": [ name_to_person(author) for author in authors if isinstance(author, str) ] }