diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index d9b3eb3..bcc31d3 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,561 +1,561 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy import itertools import logging import time from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, cast, ) from urllib.parse import urlparse import sentry_sdk from swh.core.config import merge_configs from swh.core.utils import grouper from swh.indexer.codemeta import merge_documents from swh.indexer.indexer import ( BaseIndexer, ContentIndexer, DirectoryIndexer, ObjectsDict, OriginIndexer, ) from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_dictionary import EXTRINSIC_MAPPINGS, INTRINSIC_MAPPINGS from swh.indexer.metadata_dictionary.base import DirectoryLsEntry from swh.indexer.origin_head import get_head_swhid from swh.indexer.storage import INDEXER_CFG_KEY, Sha1 from swh.indexer.storage.model import ( ContentMetadataRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.model import hashutil from swh.model.model import Directory, MetadataAuthorityType from swh.model.model import ObjectType as ModelObjectType from swh.model.model import Origin, RawExtrinsicMetadata, Sha1Git from swh.model.swhids import CoreSWHID, ExtendedObjectType, ObjectType REVISION_GET_BATCH_SIZE = 10 RELEASE_GET_BATCH_SIZE = 10 ORIGIN_GET_BATCH_SIZE = 10 T1 = TypeVar("T1") T2 = TypeVar("T2") logger = logging.getLogger(__name__) def call_with_batches( f: Callable[[List[T1]], Iterable[T2]], args: List[T1], batch_size: int, ) -> Iterator[T2]: """Calls a function with batches of args, and concatenates the results.""" groups = grouper(args, batch_size) for group in groups: yield from f(list(group)) class ExtrinsicMetadataIndexer( BaseIndexer[Sha1Git, RawExtrinsicMetadata, OriginExtrinsicMetadataRow] ): def process_journal_objects(self, objects: ObjectsDict) -> Dict: summary: Dict[str, Any] = {"status": "uneventful"} try: results = {} for item in objects.get("raw_extrinsic_metadata", []): remd = RawExtrinsicMetadata.from_dict(item) sentry_sdk.set_tag("swh-indexer-remd-swhid", str(remd.swhid())) results[remd.target] = self.index(remd.id, data=remd) except Exception: if not self.catch_exceptions: raise summary["status"] = "failed" return summary self.results = list(itertools.chain.from_iterable(results.values())) summary_persist = self.persist_index_computations(self.results) if summary_persist: for value in summary_persist.values(): if value > 0: summary["status"] = "eventful" summary.update(summary_persist) return summary def index( self, id: Sha1Git, data: Optional[RawExtrinsicMetadata], **kwargs, ) -> List[OriginExtrinsicMetadataRow]: if data is None: raise NotImplementedError( "ExtrinsicMetadataIndexer.index() without RawExtrinsicMetadata data" ) if data.target.object_type != ExtendedObjectType.ORIGIN: # other types are not supported yet return [] - if data.authority.type != MetadataAuthorityType.FORGE: + if data.authority.type == MetadataAuthorityType.REGISTRY: # metadata provided by a third-party; don't trust it # (technically this could be handled below, but we check it here # to return early; sparing a translation and origin lookup) # TODO: add ways to define trusted authorities return [] metadata_items = [] mappings: List[str] = [] for mapping_cls in EXTRINSIC_MAPPINGS.values(): if data.format in mapping_cls.extrinsic_metadata_formats(): mapping = mapping_cls() metadata_item = mapping.translate(data.metadata) if metadata_item is not None: metadata_items.append(metadata_item) mappings.append(mapping.name) if not metadata_items: # Don't have any mapping to parse it, ignore return [] # TODO: batch requests to origin_get_by_sha1() for _ in range(6): origins = self.storage.origin_get_by_sha1([data.target.object_id]) try: (origin,) = origins if origin is not None: break except ValueError: pass # The origin does not exist. This may be due to some replication lag # between the loader's DB/journal and the DB we are consuming from. # Wait a bit and try again logger.debug("Origin %s not found, sleeping for 10s.", data.target) time.sleep(10) else: # Does not exist, or replication lag > 60s. raise ValueError(f"Unknown origin {data.target}") from None if urlparse(data.authority.url).netloc != urlparse(origin["url"]).netloc: # metadata provided by a third-party; don't trust it # TODO: add ways to define trusted authorities return [] metadata = merge_documents(metadata_items) return [ OriginExtrinsicMetadataRow( id=origin["url"], indexer_configuration_id=self.tool["id"], from_remd_id=data.id, mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[OriginExtrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.origin_extrinsic_metadata_add(results) class ContentMetadataIndexer(ContentIndexer[ContentMetadataRow]): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.content_metadata_missing( ( { "id": sha1, "indexer_configuration_id": self.tool["id"], } for sha1 in ids ) ) def index( self, id: Sha1, data: Optional[bytes] = None, log_suffix="unknown directory", **kwargs, ) -> List[ContentMetadataRow]: """Index sha1s' content and store result. Args: id: content's identifier data: raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the metadata keys will be returned as None """ assert isinstance(id, bytes) assert data is not None metadata = None try: mapping_name = self.tool["tool_configuration"]["context"] log_suffix += ", content_id=%s" % hashutil.hash_to_hex(id) metadata = INTRINSIC_MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id) ) sentry_sdk.capture_exception() if metadata is None: return [] return [ ContentMetadataRow( id=id, indexer_configuration_id=self.tool["id"], metadata=metadata, ) ] def persist_index_computations( self, results: List[ContentMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" return self.idx_storage.content_metadata_add(results) DEFAULT_CONFIG: Dict[str, Any] = { "tools": { "name": "swh-metadata-detector", "version": "0.0.2", "configuration": {}, }, } class DirectoryMetadataIndexer(DirectoryIndexer[DirectoryIntrinsicMetadataRow]): """Directory-level indexer This indexer is in charge of: - filtering directories already indexed in directory_intrinsic_metadata table with defined computation tool - retrieve all entry_files in directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for directory """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.config = merge_configs(DEFAULT_CONFIG, self.config) def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones.""" yield from self.idx_storage.directory_intrinsic_metadata_missing( ( { "id": sha1_git, "indexer_configuration_id": self.tool["id"], } for sha1_git in sha1_gits ) ) def index( self, id: Sha1Git, data: Optional[Directory] = None, **kwargs ) -> List[DirectoryIntrinsicMetadataRow]: """Index directory by processing it and organizing result. use metadata_detector to iterate on filenames, passes them to the content indexers, then merges (if more than one) Args: id: sha1_git of the directory data: should always be None Returns: dict: dictionary representing a directory_intrinsic_metadata, with keys: - id: directory's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - metadata: dict of retrieved metadata """ dir_: List[DirectoryLsEntry] assert data is None, "Unexpected directory object" dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(id, recursive=False)), ) try: if [entry["type"] for entry in dir_] == ["dir"]: # If the root is just a single directory, recurse into it # eg. PyPI packages, GNU tarballs subdir = dir_[0]["target"] dir_ = cast( List[DirectoryLsEntry], list(self.storage.directory_ls(subdir, recursive=False)), ) files = [entry for entry in dir_ if entry["type"] == "file"] (mappings, metadata) = self.translate_directory_intrinsic_metadata( files, log_suffix="directory=%s" % hashutil.hash_to_hex(id), ) except Exception as e: self.log.exception("Problem when indexing dir: %r", e) sentry_sdk.capture_exception() return [] return [ DirectoryIntrinsicMetadataRow( id=id, indexer_configuration_id=self.tool["id"], mappings=mappings, metadata=metadata, ) ] def persist_index_computations( self, results: List[DirectoryIntrinsicMetadataRow] ) -> Dict[str, int]: """Persist the results in storage.""" # TODO: add functions in storage to keep data in # directory_intrinsic_metadata return self.idx_storage.directory_intrinsic_metadata_add(results) def translate_directory_intrinsic_metadata( self, files: List[DirectoryLsEntry], log_suffix: str ) -> Tuple[List[Any], Any]: """ Determine plan of action to translate metadata in the given root directory Args: files: list of file entries, as returned by :meth:`swh.storage.interface.StorageInterface.directory_ls` Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ metadata = [] tool = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {}, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = {k: self.config[k] for k in [INDEXER_CFG_KEY, "objstorage", "storage"]} config["tools"] = [tool] all_detected_files = detect_metadata(files) used_mappings = [ INTRINSIC_MAPPINGS[context].name for context in all_detected_files ] for (mapping_name, detected_files) in all_detected_files.items(): cfg = deepcopy(config) cfg["tools"][0]["configuration"]["context"] = mapping_name c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get(detected_files) for c in metadata_generator: # extracting metadata sha1 = c.id sha1s_in_storage.append(sha1) local_metadata = c.metadata # local metadata is aggregated if local_metadata: metadata.append(local_metadata) sha1s_filtered = [ item for item in detected_files if item not in sha1s_in_storage ] if sha1s_filtered: # content indexing try: c_metadata_indexer.run( sha1s_filtered, log_suffix=log_suffix, ) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result.metadata metadata.append(local_metadata) except Exception: self.log.exception("Exception while indexing metadata on contents") sentry_sdk.capture_exception() metadata = merge_documents(metadata) return (used_mappings, metadata) class OriginMetadataIndexer( OriginIndexer[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]] ): USE_TOOLS = False def __init__(self, config=None, **kwargs) -> None: super().__init__(config=config, **kwargs) self.directory_metadata_indexer = DirectoryMetadataIndexer(config=config) def index_list( self, origins: List[Origin], *, check_origin_known: bool = True, **kwargs, ) -> List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]]: head_rev_ids = [] head_rel_ids = [] origin_heads: Dict[Origin, CoreSWHID] = {} # Filter out origins not in the storage if check_origin_known: known_origins = list( call_with_batches( self.storage.origin_get, [origin.url for origin in origins], ORIGIN_GET_BATCH_SIZE, ) ) else: known_origins = list(origins) for origin in known_origins: if origin is None: continue head_swhid = get_head_swhid(self.storage, origin.url) if head_swhid: origin_heads[origin] = head_swhid if head_swhid.object_type == ObjectType.REVISION: head_rev_ids.append(head_swhid.object_id) elif head_swhid.object_type == ObjectType.RELEASE: head_rel_ids.append(head_swhid.object_id) else: assert False, head_swhid head_revs = dict( zip( head_rev_ids, call_with_batches( self.storage.revision_get, head_rev_ids, REVISION_GET_BATCH_SIZE ), ) ) head_rels = dict( zip( head_rel_ids, call_with_batches( self.storage.release_get, head_rel_ids, RELEASE_GET_BATCH_SIZE ), ) ) results = [] for (origin, head_swhid) in origin_heads.items(): sentry_sdk.set_tag("swh-indexer-origin-url", origin.url) sentry_sdk.set_tag("swh-indexer-origin-head-swhid", str(head_swhid)) if head_swhid.object_type == ObjectType.REVISION: rev = head_revs[head_swhid.object_id] if not rev: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue directory_id = rev.directory elif head_swhid.object_type == ObjectType.RELEASE: rel = head_rels[head_swhid.object_id] if not rel: self.log.warning( "Missing head object %s of origin %r", head_swhid, origin.url ) continue if rel.target_type != ModelObjectType.DIRECTORY: # TODO self.log.warning( "Head release %s of %r has unexpected target type %s", head_swhid, origin.url, rel.target_type, ) continue assert rel.target, rel directory_id = rel.target else: assert False, head_swhid for dir_metadata in self.directory_metadata_indexer.index(directory_id): # There is at most one dir_metadata orig_metadata = OriginIntrinsicMetadataRow( from_directory=dir_metadata.id, id=origin.url, metadata=dir_metadata.metadata, mappings=dir_metadata.mappings, indexer_configuration_id=dir_metadata.indexer_configuration_id, ) results.append((orig_metadata, dir_metadata)) return results def persist_index_computations( self, results: List[Tuple[OriginIntrinsicMetadataRow, DirectoryIntrinsicMetadataRow]], ) -> Dict[str, int]: # Deduplicate directories dir_metadata: Dict[bytes, DirectoryIntrinsicMetadataRow] = {} orig_metadata: Dict[str, OriginIntrinsicMetadataRow] = {} summary: Dict = {} for (orig_item, dir_item) in results: assert dir_item.metadata == orig_item.metadata if dir_item.metadata and not (dir_item.metadata.keys() <= {"@context"}): # Only store non-empty metadata sets if dir_item.id not in dir_metadata: dir_metadata[dir_item.id] = dir_item if orig_item.id not in orig_metadata: orig_metadata[orig_item.id] = orig_item if dir_metadata: summary_dir = self.idx_storage.directory_intrinsic_metadata_add( list(dir_metadata.values()) ) summary.update(summary_dir) if orig_metadata: summary_ori = self.idx_storage.origin_intrinsic_metadata_add( list(orig_metadata.values()) ) summary.update(summary_ori) return summary diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 86bc778..439a683 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,924 +1,924 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch import attr from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer import fossology_license from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( ContentLicenseRow, ContentMimetypeRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes from swh.model.model import Content, Origin, OriginVisitStatus -from .test_metadata import REMD +from .test_metadata import GITHUB_REMD from .utils import ( DIRECTORY2, RAW_CONTENT_IDS, RAW_CONTENTS, REVISION, SHA1_TO_LICENSES, mock_compute_license, ) def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_directory=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] directory_metadata = [ DirectoryIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.directory_intrinsic_metadata_add(directory_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "composer", "gemspec", "gitea", "github", "json-sword-codemeta", "maven", "npm", "nuget", "pkg-info", "pubspec", "sword-codemeta", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta", "--exclude-mapping", "json-sword-codemeta", "--exclude-mapping", "sword-codemeta", ], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) @pytest.mark.parametrize("indexer_name", ["origin_intrinsic_metadata", "*"]) def test_cli_journal_client_index__origin_intrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] storage.revision_add([REVISION]) mocker.patch( "swh.indexer.metadata.get_head_swhid", return_value=REVISION.swhid(), ) mocker.patch( "swh.indexer.metadata.DirectoryMetadataIndexer.index", return_value=[ DirectoryIntrinsicMetadataRow( id=DIRECTORY2.id, indexer_configuration_id=1, mappings=["cff"], metadata={"foo": "bar"}, ) ], ) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_intrinsic_metadata_get( [status.origin for status in visit_statuses] ) expected_results = [ OriginIntrinsicMetadataRow( id=status.origin, from_directory=DIRECTORY2.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["cff"], metadata={"foo": "bar"}, ) for status in sorted(visit_statuses_full, key=lambda r: r.origin) ] assert sorted(results, key=lambda r: r.id) == expected_results @pytest.mark.parametrize("indexer_name", ["extrinsic_metadata", "*"]) def test_cli_journal_client_index__origin_extrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) origin = Origin("http://example.org/repo.git") storage.origin_add([origin]) - raw_extrinsic_metadata = attr.evolve(REMD, target=origin.swhid()) + raw_extrinsic_metadata = attr.evolve(GITHUB_REMD, target=origin.swhid()) raw_extrinsic_metadata = attr.evolve( raw_extrinsic_metadata, id=raw_extrinsic_metadata.compute_hash() ) journal_writer.write_additions("raw_extrinsic_metadata", [raw_extrinsic_metadata]) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", 1, ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_extrinsic_metadata_get([origin.url]) expected_results = [ OriginExtrinsicMetadataRow( id=origin.url, from_remd_id=raw_extrinsic_metadata.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["github"], metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "id": "http://example.org/", "type": "https://forgefed.org/ns#Repository", "name": "test software", }, ) ] assert sorted(results, key=lambda r: r.id) == expected_results def test_cli_journal_client_index__content_mimetype( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, obj_storage, storage, mocker, swh_indexer_config, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) contents = [] expected_results = [] content_ids = [] for content_id, (raw_content, mimetypes, encoding) in RAW_CONTENTS.items(): content = Content.from_data(raw_content) assert content_id == content.sha1 contents.append(content) content_ids.append(content_id) # Older libmagic versions (e.g. buster: 1:5.35-4+deb10u2, bullseye: 1:5.39-3) # returns different results. This allows to deal with such a case when executing # tests on different environments machines (e.g. ci tox, ci debian, dev machine, # ...) all_mimetypes = mimetypes if isinstance(mimetypes, tuple) else [mimetypes] expected_results.extend( [ ContentMimetypeRow( id=content.sha1, tool={"id": 1, **swh_indexer_config["tools"]}, mimetype=mimetype, encoding=encoding, ) for mimetype in all_mimetypes ] ) assert len(contents) == len(RAW_CONTENTS) journal_writer.write_additions("content", contents) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "content_mimetype", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(contents), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.content_mimetype_get(content_ids) assert len(results) == len(contents) for result in results: assert result in expected_results def test_cli_journal_client_index__fossology_license( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, obj_storage, storage, mocker, swh_indexer_config, ): """Test the 'swh indexer journal-client' cli tool.""" # Patch fossology_license.compute_license = mock_compute_license journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) tool = {"id": 1, **swh_indexer_config["tools"]} id0, id1, id2 = RAW_CONTENT_IDS contents = [] content_ids = [] expected_results = [] for content_id, (raw_content, _, _) in RAW_CONTENTS.items(): content = Content.from_data(raw_content) assert content_id == content.sha1 contents.append(content) content_ids.append(content_id) expected_results.extend( [ ContentLicenseRow(id=content_id, tool=tool, license=license) for license in SHA1_TO_LICENSES[content_id] ] ) assert len(contents) == len(RAW_CONTENTS) journal_writer.write_additions("content", contents) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "content_fossology_license", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(contents), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.content_fossology_license_get(content_ids) assert len(results) == len(expected_results) for result in results: assert result in expected_results diff --git a/swh/indexer/tests/test_metadata.py b/swh/indexer/tests/test_metadata.py index 1f54d73..37b574c 100644 --- a/swh/indexer/tests/test_metadata.py +++ b/swh/indexer/tests/test_metadata.py @@ -1,313 +1,414 @@ # Copyright (C) 2017-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from unittest.mock import call import attr from swh.indexer.metadata import ( ContentMetadataIndexer, DirectoryMetadataIndexer, ExtrinsicMetadataIndexer, ) from swh.indexer.storage.model import ( ContentMetadataRow, DirectoryIntrinsicMetadataRow, OriginExtrinsicMetadataRow, ) from swh.indexer.tests.utils import DIRECTORY2 from swh.model.model import ( Directory, DirectoryEntry, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, RawExtrinsicMetadata, ) from swh.model.swhids import ExtendedObjectType, ExtendedSWHID from .utils import ( BASE_TEST_CONFIG, MAPPING_DESCRIPTION_CONTENT_SHA1, MAPPING_DESCRIPTION_CONTENT_SHA1GIT, YARN_PARSER_METADATA, fill_obj_storage, fill_storage, ) TRANSLATOR_TOOL = { "name": "swh-metadata-translator", "version": "0.0.2", "configuration": {"type": "local", "context": "NpmMapping"}, } class ContentMetadataTestIndexer(ContentMetadataIndexer): """Specific Metadata whose configuration is enough to satisfy the indexing tests. """ def parse_config_file(self, *args, **kwargs): assert False, "should not be called; the dir indexer configures it." DIRECTORY_METADATA_CONFIG = { **BASE_TEST_CONFIG, "tools": TRANSLATOR_TOOL, } -REMD = RawExtrinsicMetadata( +DEPOSIT_REMD = RawExtrinsicMetadata( + target=ExtendedSWHID( + object_type=ExtendedObjectType.ORIGIN, + object_id=b"\x01" * 20, + ), + discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), + authority=MetadataAuthority( + type=MetadataAuthorityType.DEPOSIT_CLIENT, + url="https://example.org/", + ), + fetcher=MetadataFetcher( + name="example-fetcher", + version="1.0.0", + ), + format="sword-v2-atom-codemeta-v2", + metadata=""" + + My Software + + Author 1 + foo@example.org + + + Author 2 + + + """.encode(), +) + +GITHUB_REMD = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.ORIGIN, object_id=b"\x01" * 20, ), discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), authority=MetadataAuthority( type=MetadataAuthorityType.FORGE, url="https://example.org/", ), fetcher=MetadataFetcher( name="example-fetcher", version="1.0.0", ), format="application/vnd.github.v3+json", metadata=b'{"full_name": "test software", "html_url": "http://example.org/"}', ) class TestMetadata: """ Tests metadata_mock_tool tool for Metadata detection """ def test_directory_metadata_indexer(self): metadata_indexer = DirectoryMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) fill_obj_storage(metadata_indexer.objstorage) fill_storage(metadata_indexer.storage) tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None dir_ = DIRECTORY2 assert ( dir_.entries[0].target == MAPPING_DESCRIPTION_CONTENT_SHA1GIT["json:yarn-parser-package.json"] ) metadata_indexer.idx_storage.content_metadata_add( [ ContentMetadataRow( id=MAPPING_DESCRIPTION_CONTENT_SHA1[ "json:yarn-parser-package.json" ], indexer_configuration_id=tool["id"], metadata=YARN_PARSER_METADATA, ) ] ) metadata_indexer.run([dir_.id]) results = list( metadata_indexer.idx_storage.directory_intrinsic_metadata_get([dir_.id]) ) expected_results = [ DirectoryIntrinsicMetadataRow( id=dir_.id, tool=TRANSLATOR_TOOL, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) ] for result in results: del result.tool["id"] assert results == expected_results def test_directory_metadata_indexer_single_root_dir(self): metadata_indexer = DirectoryMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) fill_obj_storage(metadata_indexer.objstorage) fill_storage(metadata_indexer.storage) # Add a parent directory, that is the only directory at the root # of the directory dir_ = DIRECTORY2 assert ( dir_.entries[0].target == MAPPING_DESCRIPTION_CONTENT_SHA1GIT["json:yarn-parser-package.json"] ) new_dir = Directory( entries=( DirectoryEntry( name=b"foobar-1.0.0", type="dir", target=dir_.id, perms=16384, ), ), ) assert new_dir.id is not None metadata_indexer.storage.directory_add([new_dir]) tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None metadata_indexer.idx_storage.content_metadata_add( [ ContentMetadataRow( id=MAPPING_DESCRIPTION_CONTENT_SHA1[ "json:yarn-parser-package.json" ], indexer_configuration_id=tool["id"], metadata=YARN_PARSER_METADATA, ) ] ) metadata_indexer.run([new_dir.id]) results = list( metadata_indexer.idx_storage.directory_intrinsic_metadata_get([new_dir.id]) ) expected_results = [ DirectoryIntrinsicMetadataRow( id=new_dir.id, tool=TRANSLATOR_TOOL, metadata=YARN_PARSER_METADATA, mappings=["npm"], ) ] for result in results: del result.tool["id"] assert results == expected_results def test_extrinsic_metadata_indexer_unknown_format(self, mocker): """Should be ignored when unknown format""" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") - remd = attr.evolve(REMD, format="unknown format") + remd = attr.evolve(GITHUB_REMD, format="unknown format") results = metadata_indexer.index(remd.id, data=remd) assert metadata_indexer.storage.method_calls == [] assert results == [] def test_extrinsic_metadata_indexer_github(self, mocker): """Nominal case, calling the mapping and storing the result""" origin = "https://example.org/jdoe/myrepo" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.catch_exceptions = False metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None assert metadata_indexer.process_journal_objects( - {"raw_extrinsic_metadata": [REMD.to_dict()]} + {"raw_extrinsic_metadata": [GITHUB_REMD.to_dict()]} ) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} assert metadata_indexer.storage.method_calls == [ call.origin_get_by_sha1([b"\x01" * 20]) ] results = list( metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) ) assert results == [ OriginExtrinsicMetadataRow( id="https://example.org/jdoe/myrepo", tool={"id": tool["id"], **TRANSLATOR_TOOL}, metadata={ "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "id": "http://example.org/", "type": "https://forgefed.org/ns#Repository", "name": "test software", }, - from_remd_id=REMD.id, + from_remd_id=GITHUB_REMD.id, mappings=["github"], ) ] + def test_extrinsic_metadata_indexer_firstparty_deposit(self, mocker): + """Also nominal case, calling the mapping and storing the result""" + origin = "https://example.org/jdoe/myrepo" + + metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) + metadata_indexer.catch_exceptions = False + metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") + metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] + + tool = metadata_indexer.idx_storage.indexer_configuration_get( + {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} + ) + assert tool is not None + + assert metadata_indexer.process_journal_objects( + {"raw_extrinsic_metadata": [DEPOSIT_REMD.to_dict()]} + ) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} + + assert metadata_indexer.storage.method_calls == [ + call.origin_get_by_sha1([b"\x01" * 20]) + ] + + results = list( + metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) + ) + assert results == [ + OriginExtrinsicMetadataRow( + id="https://example.org/jdoe/myrepo", + tool={"id": tool["id"], **TRANSLATOR_TOOL}, + metadata={ + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "author": [ + {"email": "foo@example.org", "name": "Author 1"}, + {"name": "Author 2"}, + ], + "name": "My Software", + }, + from_remd_id=DEPOSIT_REMD.id, + mappings=["sword-codemeta"], + ) + ] + + def test_extrinsic_metadata_indexer_thirdparty_deposit(self, mocker): + """Metadata-only deposit: currently ignored""" + origin = "https://not-from-example.org/jdoe/myrepo" + + metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) + metadata_indexer.catch_exceptions = False + metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") + metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] + + tool = metadata_indexer.idx_storage.indexer_configuration_get( + {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} + ) + assert tool is not None + + assert metadata_indexer.process_journal_objects( + {"raw_extrinsic_metadata": [DEPOSIT_REMD.to_dict()]} + ) == {"status": "uneventful", "origin_extrinsic_metadata:add": 0} + + assert metadata_indexer.storage.method_calls == [ + call.origin_get_by_sha1([b"\x01" * 20]) + ] + + results = list( + metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) + ) + assert results == [] + def test_extrinsic_metadata_indexer_nonforge_authority(self, mocker): """Early abort on non-forge authorities""" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") remd = attr.evolve( - REMD, - authority=attr.evolve(REMD.authority, type=MetadataAuthorityType.REGISTRY), + GITHUB_REMD, + authority=attr.evolve( + GITHUB_REMD.authority, type=MetadataAuthorityType.REGISTRY + ), ) results = metadata_indexer.index(remd.id, data=remd) assert metadata_indexer.storage.method_calls == [] assert results == [] def test_extrinsic_metadata_indexer_thirdparty_authority(self, mocker): """Should be ignored when authority URL does not match the origin""" origin = "https://different-domain.example.org/jdoe/myrepo" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.catch_exceptions = False metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None - results = metadata_indexer.index(REMD.id, data=REMD) + results = metadata_indexer.index(GITHUB_REMD.id, data=GITHUB_REMD) assert metadata_indexer.storage.method_calls == [ call.origin_get_by_sha1([b"\x01" * 20]) ] assert results == [] def test_extrinsic_metadata_indexer_duplicate_origin(self, mocker): """Nominal case, calling the mapping and storing the result""" origin = "https://example.org/jdoe/myrepo" metadata_indexer = ExtrinsicMetadataIndexer(config=DIRECTORY_METADATA_CONFIG) metadata_indexer.catch_exceptions = False metadata_indexer.storage = mocker.patch.object(metadata_indexer, "storage") metadata_indexer.storage.origin_get_by_sha1.return_value = [{"url": origin}] tool = metadata_indexer.idx_storage.indexer_configuration_get( {f"tool_{k}": v for (k, v) in TRANSLATOR_TOOL.items()} ) assert tool is not None assert metadata_indexer.process_journal_objects( { "raw_extrinsic_metadata": [ - REMD.to_dict(), - {**REMD.to_dict(), "id": b"\x00" * 20}, + GITHUB_REMD.to_dict(), + {**GITHUB_REMD.to_dict(), "id": b"\x00" * 20}, ] } ) == {"status": "eventful", "origin_extrinsic_metadata:add": 1} results = list( metadata_indexer.idx_storage.origin_extrinsic_metadata_get([origin]) ) assert len(results) == 1, results assert results[0].from_remd_id == b"\x00" * 20