diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py index 66eabc9..f02102c 100644 --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -1,657 +1,658 @@ # Copyright (C) 2016-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc from contextlib import contextmanager import logging import os import shutil import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar, Union, ) import warnings import sentry_sdk from typing_extensions import TypedDict from swh.core import utils from swh.core.config import load_from_envvar, merge_configs from swh.indexer.storage import INDEXER_CFG_KEY, Sha1, get_indexer_storage from swh.indexer.storage.interface import IndexerStorageInterface from swh.model import hashutil from swh.model.model import Directory, Origin, Sha1Git from swh.objstorage.exc import ObjNotFoundError from swh.objstorage.factory import get_objstorage from swh.scheduler import CONFIG as SWH_CONFIG from swh.storage import get_storage from swh.storage.interface import StorageInterface class ObjectsDict(TypedDict, total=False): directory: List[Dict] origin: List[Dict] origin_visit_status: List[Dict] + raw_extrinsic_metadata: List[Dict] @contextmanager def write_to_temp(filename: str, data: bytes, working_directory: str) -> Iterator[str]: """Write the sha1's content in a temporary file. Args: filename: one of sha1's many filenames data: the sha1's content to write in temporary file working_directory: the directory into which the file is written Returns: The path to the temporary file created. That file is filled in with the raw content's data. """ os.makedirs(working_directory, exist_ok=True) temp_dir = tempfile.mkdtemp(dir=working_directory) content_path = os.path.join(temp_dir, filename) with open(content_path, "wb") as f: f.write(data) yield content_path shutil.rmtree(temp_dir) DEFAULT_CONFIG = { INDEXER_CFG_KEY: {"cls": "memory"}, "storage": {"cls": "memory"}, "objstorage": {"cls": "memory"}, } TId = TypeVar("TId") """type of the ids of index()ed objects.""" TData = TypeVar("TData") """type of the objects passed to index().""" TResult = TypeVar("TResult") """return type of index()""" class BaseIndexer(Generic[TId, TData, TResult], metaclass=abc.ABCMeta): """Base class for indexers to inherit from. The main entry point is the :func:`run` function which is in charge of triggering the computations on the batch dict/ids received. Indexers can: - filter out ids whose data has already been indexed. - retrieve ids data from storage or objstorage - index this data depending on the object and store the result in storage. To implement a new object type indexer, inherit from the BaseIndexer and implement indexing: :meth:`~BaseIndexer.run`: object_ids are different depending on object. For example: sha1 for content, sha1_git for revision, directory, release, and id for origin To implement a new concrete indexer, inherit from the object level classes: :class:`ContentIndexer`, :class:`DirectoryIndexer`, :class:`OriginIndexer`. Then you need to implement the following functions: :meth:`~BaseIndexer.filter`: filter out data already indexed (in storage). :meth:`~BaseIndexer.index_object`: compute index on id with data (retrieved from the storage or the objstorage by the id key) and return the resulting index computation. :meth:`~BaseIndexer.persist_index_computations`: persist the results of multiple index computations in the storage. The new indexer implementation can also override the following functions: :meth:`~BaseIndexer.prepare`: Configuration preparation for the indexer. When overriding, this must call the `super().prepare()` instruction. :meth:`~BaseIndexer.check`: Configuration check for the indexer. When overriding, this must call the `super().check()` instruction. :meth:`~BaseIndexer.register_tools`: This should return a dict of the tool(s) to use when indexing or filtering. """ results: List[TResult] USE_TOOLS = True catch_exceptions = True """Prevents exceptions in `index()` from raising too high. Set to False in tests to properly catch all exceptions.""" scheduler: Any storage: StorageInterface objstorage: Any idx_storage: IndexerStorageInterface def __init__(self, config=None, **kw) -> None: """Prepare and check that the indexer is ready to run.""" super().__init__() if config is not None: self.config = config elif SWH_CONFIG: self.config = SWH_CONFIG.copy() else: self.config = load_from_envvar() self.config = merge_configs(DEFAULT_CONFIG, self.config) self.prepare() self.check() self.log.debug("%s: config=%s", self, self.config) def prepare(self) -> None: """Prepare the indexer's needed runtime configuration. Without this step, the indexer cannot possibly run. """ config_storage = self.config.get("storage") if config_storage: self.storage = get_storage(**config_storage) self.objstorage = get_objstorage(**self.config["objstorage"]) idx_storage = self.config[INDEXER_CFG_KEY] self.idx_storage = get_indexer_storage(**idx_storage) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) self.log = logging.getLogger("swh.indexer") if self.USE_TOOLS: self.tools = list(self.register_tools(self.config.get("tools", []))) self.results = [] @property def tool(self) -> Dict: return self.tools[0] def check(self) -> None: """Check the indexer's configuration is ok before proceeding. If ok, does nothing. If not raise error. """ if self.USE_TOOLS and not self.tools: raise ValueError("Tools %s is unknown, cannot continue" % self.tools) def _prepare_tool(self, tool: Dict[str, Any]) -> Dict[str, Any]: """Prepare the tool dict to be compliant with the storage api.""" return {"tool_%s" % key: value for key, value in tool.items()} def register_tools( self, tools: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: """Permit to register tools to the storage. Add a sensible default which can be overridden if not sufficient. (For now, all indexers use only one tool) Expects the self.config['tools'] property to be set with one or more tools. Args: tools: Either a dict or a list of dict. Returns: list: List of dicts with additional id key. Raises: ValueError: if not a list nor a dict. """ if isinstance(tools, list): tools = list(map(self._prepare_tool, tools)) elif isinstance(tools, dict): tools = [self._prepare_tool(tools)] else: raise ValueError("Configuration tool(s) must be a dict or list!") if tools: return self.idx_storage.indexer_configuration_add(tools) else: return [] def index(self, id: TId, data: Optional[TData], **kwargs) -> List[TResult]: """Index computation for the id and associated raw data. Args: id: identifier or Dict object data: id's data from storage or objstorage depending on object type Returns: dict: a dict that makes sense for the :meth:`.persist_index_computations` method. """ raise NotImplementedError() def filter(self, ids: List[TId]) -> Iterator[TId]: """Filter missing ids for that particular indexer. Args: ids: list of ids Yields: iterator of missing ids """ yield from ids @abc.abstractmethod def persist_index_computations(self, results: List[TResult]) -> Dict[str, int]: """Persist the computation resulting from the index. Args: results: List of results. One result is the result of the index function. Returns: a summary dict of what has been inserted in the storage """ return {} class ContentIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content indexer working on a list of ids directly. To work on indexer partition, use the :class:`ContentPartitionIndexer` instead. Note: :class:`ContentIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1], **kwargs) -> Dict: """Given a list of ids: - retrieve the content from the storage - execute the indexing computations - store the results Args: ids (Iterable[Union[bytes, str]]): sha1's identifier list **kwargs: passed to the `index` method Returns: A summary Dict of the task's status """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] sha1s = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] results = [] summary: Dict = {"status": "uneventful"} try: for sha1 in sha1s: try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning( "Content %s not found in objstorage" % hashutil.hash_to_hex(sha1) ) continue res = self.index(sha1, raw_content, **kwargs) if res: # If no results, skip it results.extend(res) summary["status"] = "eventful" summary = self.persist_index_computations(results) self.results = results except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when reading contents metadata.") sentry_sdk.capture_exception() summary["status"] = "failed" return summary class ContentPartitionIndexer(BaseIndexer[Sha1, bytes, TResult], Generic[TResult]): """A content partition indexer. This expects as input a partition_id and a nb_partitions. This will then index the contents within that partition. To work on a list of ids, use the :class:`ContentIndexer` instead. Note: :class:`ContentPartitionIndexer` is not an instantiable object. To use it, one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ @abc.abstractmethod def indexed_contents_in_partition( self, partition_id: int, nb_partitions: int ) -> Iterable[Sha1]: """Retrieve indexed contents within range [start, end]. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into page_token: opaque token used for pagination """ pass def _list_contents_to_index( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1] ) -> Iterable[Sha1]: """Compute from storage the new contents to index in the partition_id . The already indexed contents are skipped. Args: partition_id: Index of the partition to fetch data from nb_partitions: Total number of partition indexed: Set of content already indexed. Yields: Sha1 id (bytes) of contents to index """ if not isinstance(partition_id, int) or not isinstance(nb_partitions, int): raise TypeError( f"identifiers must be int, not {partition_id!r} and {nb_partitions!r}." ) next_page_token = None while True: result = self.storage.content_get_partition( partition_id, nb_partitions, page_token=next_page_token ) contents = result.results for c in contents: _id = hashutil.hash_to_bytes(c.sha1) if _id in indexed: continue yield _id next_page_token = result.next_page_token if next_page_token is None: break def _index_contents( self, partition_id: int, nb_partitions: int, indexed: Set[Sha1], **kwargs: Any ) -> Iterator[TResult]: """Index the contents within the partition_id. Args: start: Starting bound from range identifier end: End range identifier indexed: Set of content already indexed. Yields: indexing result as dict to persist in the indexer backend """ for sha1 in self._list_contents_to_index(partition_id, nb_partitions, indexed): try: raw_content = self.objstorage.get(sha1) except ObjNotFoundError: self.log.warning(f"Content {sha1.hex()} not found in objstorage") continue yield from self.index(sha1, raw_content, **kwargs) def _index_with_skipping_already_done( self, partition_id: int, nb_partitions: int ) -> Iterator[TResult]: """Index not already indexed contents within the partition partition_id Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into Yields: indexing result as dict to persist in the indexer backend """ already_indexed_contents = set( self.indexed_contents_in_partition(partition_id, nb_partitions) ) return self._index_contents( partition_id, nb_partitions, already_indexed_contents ) def run( self, partition_id: int, nb_partitions: int, skip_existing: bool = True, **kwargs, ) -> Dict: """Given a partition of content ids, index the contents within. Either the indexer is incremental (filter out existing computed data) or it computes everything from scratch. Args: partition_id: Index of the partition to fetch nb_partitions: Total number of partitions to split into skip_existing: Skip existing indexed data (default) or not **kwargs: passed to the `index` method Returns: dict with the indexing task status """ summary: Dict[str, Any] = {"status": "uneventful"} count = 0 try: if skip_existing: gen = self._index_with_skipping_already_done( partition_id, nb_partitions ) else: gen = self._index_contents(partition_id, nb_partitions, indexed=set([])) count_object_added_key: Optional[str] = None for contents in utils.grouper(gen, n=self.config["write_batch_size"]): res = self.persist_index_computations(list(contents)) if not count_object_added_key: count_object_added_key = list(res.keys())[0] count += res[count_object_added_key] if count > 0: summary["status"] = "eventful" except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when computing metadata.") sentry_sdk.capture_exception() summary["status"] = "failed" if count > 0 and count_object_added_key: summary[count_object_added_key] = count return summary class OriginIndexer(BaseIndexer[str, None, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Origin indexing using the run method Note: the :class:`OriginIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, origin_urls: List[str], **kwargs) -> Dict: """Given a list of origin urls: - retrieve origins from storage - execute the indexing computations - store the results Args: origin_urls: list of origin urls. **kwargs: passed to the `index` method """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] origins = [{"url": url} for url in origin_urls] return self.process_journal_objects({"origin": origins}) def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``.""" origins = [ Origin(url=status["origin"]) for status in objects.get("origin_visit_status", []) if status["status"] == "full" ] + [Origin(url=origin["url"]) for origin in objects.get("origin", [])] summary: Dict[str, Any] = {"status": "uneventful"} try: results = self.index_list( origins, check_origin_known=False, # no need to check they exist, as we just received either an origin or # visit status; which cannot be created by swh-storage unless the origin # already exists ) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary summary_persist = self.persist_index_computations(results) self.results = results if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index_list(self, origins: List[Origin], **kwargs) -> List[TResult]: results = [] for origin in origins: try: results.extend(self.index(origin.url, **kwargs)) except Exception: self.log.exception("Problem when processing origin %s", origin.url) sentry_sdk.capture_exception() raise return results class DirectoryIndexer(BaseIndexer[Sha1Git, Directory, TResult], Generic[TResult]): """An object type indexer, inherits from the :class:`BaseIndexer` and implements Directory indexing using the run method Note: the :class:`DirectoryIndexer` is not an instantiable object. To use it in another context one should inherit from this class and override the methods mentioned in the :class:`BaseIndexer` class. """ def run(self, ids: List[Sha1Git], **kwargs) -> Dict: """Given a list of sha1_gits: - retrieve directories from storage - execute the indexing computations - store the results Args: ids: sha1_git's identifier list """ if "policy_update" in kwargs: warnings.warn( "'policy_update' argument is deprecated and ignored.", DeprecationWarning, ) del kwargs["policy_update"] directory_ids = [ hashutil.hash_to_bytes(id_) if isinstance(id_, str) else id_ for id_ in ids ] return self._process_directories([(dir_id, None) for dir_id in directory_ids]) def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``.""" return self._process_directories( [ (dir_["id"], Directory.from_dict(dir_)) for dir_ in objects.get("directory", []) ] ) def _process_directories( self, directories: Union[List[Tuple[Sha1Git, Directory]], List[Tuple[Sha1Git, None]]], ) -> Dict: summary: Dict[str, Any] = {"status": "uneventful"} results = [] # TODO: fetch raw_manifest when useful? for (dir_id, dir_) in directories: try: results.extend(self.index(dir_id, dir_)) except Exception: if not self.catch_exceptions: raise self.log.exception("Problem when processing directory") sentry_sdk.capture_exception() summary["status"] = "failed" summary_persist = self.persist_index_computations(results) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) self.results = results return summary diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index eeecbdc..c5ad56f 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,452 +1,536 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, cast, ) +from urllib.parse import urlparse import sentry_sdk from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents -from swh.indexer.indexer import ContentIndexer, DirectoryIndexer, OriginIndexer +from swh.indexer.indexer import ( + BaseIndexer, + ContentIndexer, + DirectoryIndexer, + ObjectsDict, + OriginIndexer, +) from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.origin_head import get_head_swhid from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, DirectoryIntrinsicMetadataRow, + OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.model import hashutil -from swh.model.model import Directory +from swh.model.model import Directory, MetadataAuthorityType from swh.model.model import ObjectType as ModelObjectType -from swh.model.model import Origin, Sha1Git -from swh.model.swhids import CoreSWHID, ObjectType +from swh.model.model import Origin, RawExtrinsicMetadata, Sha1Git +from swh.model.swhids import CoreSWHID, ExtendedObjectType, ObjectType REVISION_GET_BATCH_SIZE = 10 RELEASE_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) +class ExtrinsicMetadataIndexer( + BaseIndexer[Sha1Git, RawExtrinsicMetadata, OriginExtrinsicMetadataRow] +): + def process_journal_objects(self, objects: ObjectsDict) -> Dict: + summary: Dict[str, Any] = {"status": "uneventful"} + try: + results = [] + for item in objects.get("raw_extrinsic_metadata", []): + results.extend( + self.index(item["id"], data=RawExtrinsicMetadata.from_dict(item)) + ) + except Exception: + if not self.catch_exceptions: + raise + summary["status"] = "failed" + return summary + + summary_persist = self.persist_index_computations(results) + self.results = results + if summary_persist: + for value in summary_persist.values(): + if value > 0: + summary["status"] = "eventful" + summary.update(summary_persist) + return summary + + def index( + self, + id: Sha1Git, + data: Optional[RawExtrinsicMetadata], + **kwargs, + ) -> List[OriginExtrinsicMetadataRow]: + if data is None: + raise NotImplementedError( + "ExtrinsicMetadataIndexer.index() without RawExtrinsicMetadata data" + ) + if data.target.object_type != ExtendedObjectType.ORIGIN: + # other types are not supported yet + return [] + + if data.authority.type != MetadataAuthorityType.FORGE: + # metadata provided by a third-party; don't trust it + # (technically this could be handled below, but we check it here + # to return early; sparing a translation and origin lookup) + # TODO: add ways to define trusted authorities + return [] + + metadata_items = [] + mappings = [] + for (mapping_name, mapping) in MAPPINGS.items(): + if data.format in mapping.extrinsic_metadata_formats(): + metadata_item = mapping().translate(data.metadata) + if metadata_item is not None: + metadata_items.append(metadata_item) + mappings.append(mapping_name) + + if not metadata_items: + # Don't have any mapping to parse it, ignore + return [] + + # TODO: batch requests to origin_get_by_sha1() + origins = self.storage.origin_get_by_sha1([data.target.object_id]) + try: + (origin,) = origins + if origin is None: + raise ValueError() + except ValueError: + raise ValueError(f"Unknown origin {data.target}") from None + + if urlparse(data.authority.url).netloc != urlparse(origin["url"]).netloc: + # metadata provided by a third-party; don't trust it + # TODO: add ways to define trusted authorities + return [] + + metadata = merge_documents(metadata_items) + + return [ + OriginExtrinsicMetadataRow( + id=origin["url"], + indexer_configuration_id=self.tool["id"], + from_remd_id=data.id, + mappings=mappings, + metadata=metadata, + ) + ] + + def persist_index_computations( + self, results: List[OriginExtrinsicMetadataRow] + ) -> Dict[str, int]: + """Persist the results in storage.""" + return self.idx_storage.origin_extrinsic_metadata_add(results) + + class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown directory", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) metadata = MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: - """Persist the results in storage. - - Args: - results: list of content_metadata, dict with the - following keys: - - id (bytes): content's identifier (sha1) - - metadata (jsonb): detected metadata - - """ + """Persist the results in storage.""" return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): """Directory-level indexer This indexer is in charge of: - filtering directories already indexed in directory_intrinsic_metadata table with defined computation tool - retrieve all entry_files in directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for directory """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.directory_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Directory] = None, **kwargs ) -> List[DirectoryIntrinsicMetadataRow]: """Index directory by processing it and organizing result. use metadata_detector to iterate on filenames, passes them to the content indexers, then merges (if more than one) Args: id: sha1_git of the directory data: should always be None Returns: dict: dictionary representing a directory_intrinsic_metadata, with keys: - id: directory's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ dir_: List[DirectoryLsEntry] assert data is None, "Unexpected directory object" dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(id, recursive=False)), ) try: if [entry["type"] for entry in dir_] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_[0]["target"] dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(subdir, recursive=False)), ) files = [entry for entry in dir_ if entry["type"] == "file"] (mappings, metadata) = self.translate_directory_intrinsic_metadata( files, log_suffix="directory=%s" % hashutil.hash_to_hex(id), ) except Exception as e: self.log.exception("Problem when indexing dir: %r", e) sentry_sdk.capture_exception() return [] return [ DirectoryIntrinsicMetadataRow( id=id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[DirectoryIntrinsicMetadataRow] ) -> Dict[str, int]: - """Persist the results in storage. - - Args: - results: list of content_mimetype, dict with the - following keys: - - id (bytes): content's identifier (sha1) - - mimetype (bytes): mimetype in bytes - - encoding (bytes): encoding in bytes - - """ + """Persist the results in storage.""" # TODO: add functions in storage to keep data in # directory_intrinsic_metadata return self.idx_storage.directory_intrinsic_metadata_add(results) def translate_directory_intrinsic_metadata( self, files: List[DirectoryLsEntry], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata in the given root directory Args: files: list of file entries, as returned by :meth:`swh.storage.interface.StorageInterface.directory_ls` Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] all_detected_files = detect_metadata(files) used_mappings = [MAPPINGS[context].name for context in all_detected_files] for (mapping_name, detected_files) in all_detected_files.items(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = mapping_name c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get(detected_files) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) def index_list( self, origins: List[Origin], check_origin_known: bool = True, **kwargs ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: head_rev_ids = [] head_rel_ids = [] origin_heads: Dict[Origin, CoreSWHID] = {} # Filter out origins not in the storage if check_origin_known: known_origins = list( call_with_batches( self.storage.origin_get, [origin.url for origin in origins], ORIGIN_GET_BATCH_SIZE, ) ) else: known_origins = list(origins) for origin in known_origins: if origin is None: continue head_swhid = get_head_swhid(self.storage, origin.url) if head_swhid: origin_heads[origin] = head_swhid if head_swhid.object_type == ObjectType.REVISION: head_rev_ids.append(head_swhid.object_id) elif head_swhid.object_type == ObjectType.RELEASE: head_rel_ids.append(head_swhid.object_id) else: assert False, head_swhid head_revs = dict( zip( head_rev_ids, call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ), ) ) head_rels = dict( zip( head_rel_ids, call_with_batches( self.storage.release_get, head_rel_ids, RELEASE_GET_BATCH_SIZE ), ) ) results = [] for (origin, head_swhid) in origin_heads.items(): if head_swhid.object_type == ObjectType.REVISION: rev = head_revs[head_swhid.object_id] if not rev: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue directory_id = rev.directory elif head_swhid.object_type == ObjectType.RELEASE: rel = head_rels[head_swhid.object_id] if not rel: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue if rel.target_type != ModelObjectType.DIRECTORY: # TODO self.log.warning( "Head release %s of %r has unexpected target type %s", head_swhid, origin.url, rel.target_type, ) continue assert rel.target, rel directory_id = rel.target else: assert False, head_swhid for dir_metadata in self.directory_metadata_indexer.index(directory_id): # There is at most one dir_metadata orig_metadata = OriginIntrinsicMetadataRow( from_directory=dir_metadata.id, id=origin.url, metadata=dir_metadata.metadata, mappings=dir_metadata.mappings, indexer_configuration_id=dir_metadata.indexer_configuration_id, ) results.append((orig_metadata, dir_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate directories dir_metadata: List[DirectoryIntrinsicMetadataRow] = [] orig_metadata: List[OriginIntrinsicMetadataRow] = [] summary: Dict = {} for (orig_item, dir_item) in results: assert dir_item.metadata == orig_item.metadata if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if dir_item not in dir_metadata: dir_metadata.append(dir_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) if dir_metadata: summary_dir = self.idx_storage.directory_intrinsic_metadata_add( dir_metadata ) summary.update(summary_dir) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add(orig_metadata) summary.update(summary_ori) return summary diff --git a/swh/indexer/metadata_dictionary/base.py b/swh/indexer/metadata_dictionary/base.py index 44343be..82b3daf 100644 --- a/swh/indexer/metadata_dictionary/base.py +++ b/swh/indexer/metadata_dictionary/base.py @@ -1,228 +1,229 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar from typing_extensions import TypedDict from swh.indexer.codemeta import SCHEMA_URI, compact, merge_values from swh.indexer.storage.interface import Sha1 class DirectoryLsEntry(TypedDict): target: Sha1 sha1: Sha1 name: bytes type: str TTranslateCallable = TypeVar( "TTranslateCallable", bound=Callable[[Any, Dict[str, Any], Any], None] ) def produce_terms( namespace: str, terms: List[str] ) -> Callable[[TTranslateCallable], TTranslateCallable]: """Returns a decorator that marks the decorated function as adding the given terms to the ``translated_metadata`` dict""" def decorator(f: TTranslateCallable) -> TTranslateCallable: if not hasattr(f, "produced_terms"): f.produced_terms = [] # type: ignore f.produced_terms.extend(namespace + term for term in terms) # type: ignore return f return decorator class BaseMapping: """Base class for mappings to inherit from To implement a new mapping: - inherit this class - override translate function """ def __init__(self, log_suffix=""): self.log_suffix = log_suffix self.log = logging.getLogger( "%s.%s" % (self.__class__.__module__, self.__class__.__name__) ) @property def name(self): """A name of this mapping, used as an identifier in the indexer storage.""" raise NotImplementedError(f"{self.__class__.__name__}.name") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: """ Returns the sha1 hashes of files which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.detect_metadata_files") @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: """ Returns the list of extrinsic metadata formats which can be translated by this mapping """ raise NotImplementedError(f"{cls.__name__}.extrinsic_metadata_formats") def translate(self, file_content: bytes) -> Optional[Dict]: + """Translates intrinsic metadata, from the content of a file.""" raise NotImplementedError(f"{self.__class__.__name__}.translate") def normalize_translation(self, metadata: Dict[str, Any]) -> Dict[str, Any]: return compact(metadata) class SingleFileMapping(BaseMapping): """Base class for all intrinsic metadata mappings that use a single file as input.""" @property def filename(self): """The .json file to extract metadata from.""" raise NotImplementedError(f"{self.__class__.__name__}.filename") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].lower() == cls.filename: return [entry["sha1"]] return [] @classmethod def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: # this class is only used by intrinsic metadata mappings return () class DictMapping(BaseMapping): """Base class for mappings that take as input a file that is mostly a key-value store (eg. a shallow JSON dict).""" string_fields = [] # type: List[str] """List of fields that are simple strings, and don't need any normalization.""" @property def mapping(self): """A translation dict to map dict keys into a canonical name.""" raise NotImplementedError(f"{self.__class__.__name__}.mapping") @staticmethod def _normalize_method_name(name: str) -> str: return name.replace("-", "_") @classmethod def supported_terms(cls): # one-to-one mapping from the original key to a CodeMeta term simple_terms = { term for (key, term) in cls.mapping.items() if key in cls.string_fields or hasattr(cls, "normalize_" + cls._normalize_method_name(key)) } # more complex mapping from the original key to JSON-LD complex_terms = { term for meth_name in dir(cls) if meth_name.startswith("translate_") for term in getattr(getattr(cls, meth_name), "produced_terms", []) } return simple_terms | complex_terms def _translate_dict( self, content_dict: Dict, *, normalize: bool = True ) -> Dict[str, str]: """ Translates content by parsing content from a dict object and translating with the appropriate mapping Args: content_dict (dict): content dict to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ translated_metadata = {"@type": SCHEMA_URI + "SoftwareSourceCode"} for k, v in content_dict.items(): # First, check if there is a specific translation # method for this key translation_method = getattr( self, "translate_" + self._normalize_method_name(k), None ) if translation_method: translation_method(translated_metadata, v) elif k in self.mapping: # if there is no method, but the key is known from the # crosswalk table codemeta_key = self.mapping[k] # if there is a normalization method, use it on the value normalization_method = getattr( self, "normalize_" + self._normalize_method_name(k), None ) if normalization_method: v = normalization_method(v) elif k in self.string_fields and isinstance(v, str): pass elif k in self.string_fields and isinstance(v, list): v = [x for x in v if isinstance(x, str)] else: continue # set the translation metadata with the normalized value if codemeta_key in translated_metadata: translated_metadata[codemeta_key] = merge_values( translated_metadata[codemeta_key], v ) else: translated_metadata[codemeta_key] = v if normalize: return self.normalize_translation(translated_metadata) else: return translated_metadata class JsonMapping(DictMapping): """Base class for all mappings that use JSON data as input.""" def translate(self, raw_content: bytes) -> Optional[Dict]: """ Translates content by parsing content from a bytestring containing json data and translating with the appropriate mapping Args: raw_content (bytes): raw content to translate Returns: dict: translated metadata in json-friendly form needed for the indexer """ try: raw_content_string: str = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return None try: content_dict = json.loads(raw_content_string) except json.JSONDecodeError: self.log.warning("Error unjsoning from %s", self.log_suffix) return None if isinstance(content_dict, dict): return self._translate_dict(content_dict) return None diff --git a/swh/indexer/metadata_dictionary/ruby.py b/swh/indexer/metadata_dictionary/ruby.py index 6dc3205..55ce217 100644 --- a/swh/indexer/metadata_dictionary/ruby.py +++ b/swh/indexer/metadata_dictionary/ruby.py @@ -1,135 +1,140 @@ # Copyright (C) 2018-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import ast import itertools import re -from typing import List +from typing import List, Tuple from swh.indexer.codemeta import CROSSWALK_TABLE, SCHEMA_URI from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.storage.interface import Sha1 from .base import DictMapping def name_to_person(name): return { "@type": SCHEMA_URI + "Person", SCHEMA_URI + "name": name, } class GemspecMapping(DictMapping): name = "gemspec" mapping = CROSSWALK_TABLE["Ruby Gem"] string_fields = ["name", "version", "description", "summary", "email"] _re_spec_new = re.compile(r".*Gem::Specification.new +(do|\{) +\|.*\|.*") _re_spec_entry = re.compile(r"\s*\w+\.(?P\w+)\s*=\s*(?P.*)") @classmethod def detect_metadata_files(cls, file_entries: List[DirectoryLsEntry]) -> List[Sha1]: for entry in file_entries: if entry["name"].endswith(b".gemspec"): return [entry["sha1"]] return [] + @classmethod + def extrinsic_metadata_formats(cls) -> Tuple[str, ...]: + # this class is only used by intrinsic metadata mappings + return () + def translate(self, raw_content): try: raw_content = raw_content.decode() except UnicodeDecodeError: self.log.warning("Error unidecoding from %s", self.log_suffix) return # Skip lines before 'Gem::Specification.new' lines = itertools.dropwhile( lambda x: not self._re_spec_new.match(x), raw_content.split("\n") ) try: next(lines) # Consume 'Gem::Specification.new' except StopIteration: self.log.warning("Could not find Gem::Specification in %s", self.log_suffix) return content_dict = {} for line in lines: match = self._re_spec_entry.match(line) if match: value = self.eval_ruby_expression(match.group("expr")) if value: content_dict[match.group("key")] = value return self._translate_dict(content_dict) def eval_ruby_expression(self, expr): """Very simple evaluator of Ruby expressions. >>> GemspecMapping().eval_ruby_expression('"Foo bar"') 'Foo bar' >>> GemspecMapping().eval_ruby_expression("'Foo bar'") 'Foo bar' >>> GemspecMapping().eval_ruby_expression("['Foo', 'bar']") ['Foo', 'bar'] >>> GemspecMapping().eval_ruby_expression("'Foo bar'.freeze") 'Foo bar' >>> GemspecMapping().eval_ruby_expression( \ "['Foo'.freeze, 'bar'.freeze]") ['Foo', 'bar'] """ def evaluator(node): if isinstance(node, ast.Str): return node.s elif isinstance(node, ast.List): res = [] for element in node.elts: val = evaluator(element) if not val: return res.append(val) return res expr = expr.replace(".freeze", "") try: # We're parsing Ruby expressions here, but Python's # ast.parse works for very simple Ruby expressions # (mainly strings delimited with " or ', and lists # of such strings). tree = ast.parse(expr, mode="eval") except (SyntaxError, ValueError): return if isinstance(tree, ast.Expression): return evaluator(tree.body) def normalize_homepage(self, s): if isinstance(s, str): return {"@id": s} def normalize_license(self, s): if isinstance(s, str): return [{"@id": "https://spdx.org/licenses/" + s}] def normalize_licenses(self, licenses): if isinstance(licenses, list): return [ {"@id": "https://spdx.org/licenses/" + license} for license in licenses if isinstance(license, str) ] def normalize_author(self, author): if isinstance(author, str): return {"@list": [name_to_person(author)]} def normalize_authors(self, authors): if isinstance(authors, list): return { "@list": [ name_to_person(author) for author in authors if isinstance(author, str) ] } diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 1e4dfdf..c01b8ee 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,143 +1,270 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.indexer.metadata import ContentMetadataIndexer, DirectoryMetadataIndexer -from swh.indexer.storage.model import ContentMetadataRow, DirectoryIntrinsicMetadataRow +import datetime +from unittest.mock import call + +import attr + +from swh.indexer.metadata import ( + ContentMetadataIndexer, + DirectoryMetadataIndexer, + ExtrinsicMetadataIndexer, +) +from swh.indexer.storage.model import ( + ContentMetadataRow, + DirectoryIntrinsicMetadataRow, + OriginExtrinsicMetadataRow, +) from swh.indexer.tests.utils import DIRECTORY2 -from swh.model.model import Directory, DirectoryEntry +from swh.model.model import ( + Directory, + DirectoryEntry, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + RawExtrinsicMetadata, +) +from swh.model.swhids import ExtendedObjectType, ExtendedSWHID from .utils import ( BASE_TEST_CONFIG, YARN_PARSER_METADATA, fill_obj_storage, fill_storage, ) TRANSLATOR_TOOL = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {"type": "local", "context": "NpmMapping"}, } class ContentMetadataTestIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def parse_config_file(self, *args, **kwargs): assert False, "should not be called; the dir indexer configures it." DIRECTORY_METADATA_CONFIG = { **BASE_TEST_CONFIG, "tools": TRANSLATOR_TOOL, } +REMD = RawExtrinsicMetadata( + target=ExtendedSWHID( + object_type=ExtendedObjectType.ORIGIN, + object_id=b"\x01" * 20, + ), + discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), + authority=MetadataAuthority( + type=MetadataAuthorityType.FORGE, + url="https://example.org/", + ), + fetcher=MetadataFetcher( + name="example-fetcher", + version="1.0.0", + ), + format="application/vnd.github.v3+json", + metadata=b'{"full_name": "test software"}', +) + class TestMetadata: """ Tests metadata_mock_tool tool for Metadata detection """ def test_directory_metadata_indexer(self): metadata_indexer = DirectoryMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) fill_obj_storage(metadata_indexer.objstorage) fill_storage(metadata_indexer.storage) tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None dir_ = DIRECTORY2 metadata_indexer.idx_storage.content_metadata_add( [ ContentMetadataRow( id=DIRECTORY2.entries[0].target, indexer_configuration_id=tool["id"], metadata=YARN_PARSER_METADATA, ) ] ) metadata_indexer.run([dir_.id]) results = list( metadata_indexer.idx_storage.directory_intrinsic_metadata_get( [DIRECTORY2.id] ) ) expected_results = [ DirectoryIntrinsicMetadataRow( id=dir_.id, tool=TRANSLATOR_TOOL, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) ] for result in results: del result.tool["id"] assert results == expected_results def test_directory_metadata_indexer_single_root_dir(self): metadata_indexer = DirectoryMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) fill_obj_storage(metadata_indexer.objstorage) fill_storage(metadata_indexer.storage) # Add a parent directory, that is the only directory at the root # of the directory dir_ = DIRECTORY2 new_dir = Directory( entries=( DirectoryEntry( name=b"foobar-1.0.0", type="dir", target=dir_.id, perms=16384, ), ), ) assert new_dir.id is not None metadata_indexer.storage.directory_add([new_dir]) tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None metadata_indexer.idx_storage.content_metadata_add( [ ContentMetadataRow( id=DIRECTORY2.entries[0].target, indexer_configuration_id=tool["id"], metadata=YARN_PARSER_METADATA, ) ] ) metadata_indexer.run([new_dir.id]) results = list( metadata_indexer.idx_storage.directory_intrinsic_metadata_get([new_dir.id]) ) expected_results = [ DirectoryIntrinsicMetadataRow( id=new_dir.id, tool=TRANSLATOR_TOOL, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) ] for result in results: del result.tool["id"] assert results == expected_results + + def test_extrinsic_metadata_indexer_unknown_format(self, mocker): + """Should be ignored when unknown format""" + metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) + metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") + + remd = attr.evolve(REMD, format="unknown format") + + results = metadata_indexer.index(remd.id, data=remd) + + assert metadata_indexer.storage.method_calls == [] + assert results == [] + + def test_extrinsic_metadata_indexer_github(self, mocker): + """Nominal case, calling the mapping and storing the result""" + origin = "https://example.org/jdoe/myrepo" + + metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) + metadata_indexer.catch_exceptions = False + metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") + metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] + + tool = metadata_indexer.idx_storage.indexer_configuration_get( + {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} + ) + assert tool is not None + + assert metadata_indexer.process_journal_objects( + {"raw_extrinsic_metadata": [REMD.to_dict()]} + ) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} + + assert metadata_indexer.storage.method_calls == [ + call.origin_get_by_sha1([b"\x01" * 20]) + ] + + results = list( + metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) + ) + assert results == [ + OriginExtrinsicMetadataRow( + id="https://example.org/jdoe/myrepo", + tool={"id": tool["id"], **TRANSLATOR_TOOL}, + metadata={ + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "https://forgefed.org/ns#Repository", + "name": "test software", + }, + from_remd_id=REMD.id, + mappings=["GitHubMapping"], + ) + ] + + def test_extrinsic_metadata_indexer_nonforge_authority(self, mocker): + """Early abort on non-forge authorities""" + metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) + metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") + + remd = attr.evolve( + REMD, + authority=attr.evolve(REMD.authority, type=MetadataAuthorityType.REGISTRY), + ) + + results = metadata_indexer.index(remd.id, data=remd) + + assert metadata_indexer.storage.method_calls == [] + assert results == [] + + def test_extrinsic_metadata_indexer_thirdparty_authority(self, mocker): + """Should be ignored when authority URL does not match the origin""" + + origin = "https://different-domain.example.org/jdoe/myrepo" + + metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) + metadata_indexer.catch_exceptions = False + metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") + metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] + + tool = metadata_indexer.idx_storage.indexer_configuration_get( + {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} + ) + assert tool is not None + + results = metadata_indexer.index(REMD.id, data=REMD) + + assert metadata_indexer.storage.method_calls == [ + call.origin_get_by_sha1([b"\x01" * 20]) + ] + assert results == []