diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index e6581a6..aa2ad3e 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,363 +1,373 @@ # Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Callable, Dict, Iterator, List, Optional # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group @swh_cli_group.group( name="indexer", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path( exists=True, dir_okay=False, ), help="Configuration file.", ) @click.pass_context def indexer_cli_group(ctx, config_file): """Software Heritage Indexer tools. The Indexer is used to mine the content of the archive and extract derived information from archive source code artifacts. """ from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf def _get_api(getter, config, config_key, url): if url: config[config_key] = {"cls": "remote", "url": url} elif config_key not in config: raise click.ClickException("Missing configuration for {}".format(config_key)) return getter(**config[config_key]) @indexer_cli_group.group("mapping") def mapping(): """Manage Software Heritage Indexer mappings.""" pass @mapping.command("list") def mapping_list(): """Prints the list of known mappings.""" from swh.indexer import metadata_dictionary mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()] mapping_names.sort() for mapping_name in mapping_names: click.echo(mapping_name) @mapping.command("list-terms") @click.option( "--exclude-mapping", multiple=True, help="Exclude the given mapping from the output" ) @click.option( "--concise", is_flag=True, default=False, help="Don't print the list of mappings supporting each term.", ) def mapping_list_terms(concise, exclude_mapping): """Prints the list of known CodeMeta terms, and which mappings support them.""" from swh.indexer import metadata_dictionary properties = metadata_dictionary.list_terms() for (property_name, supported_mappings) in sorted(properties.items()): supported_mappings = {m.name for m in supported_mappings} supported_mappings -= set(exclude_mapping) if supported_mappings: if concise: click.echo(property_name) else: click.echo("{}:".format(property_name)) click.echo("\t" + ", ".join(sorted(supported_mappings))) @mapping.command("translate") @click.argument("mapping-name") @click.argument("file", type=click.File("rb")) def mapping_translate(mapping_name, file): """Translates file from mapping-name to codemeta format.""" import json from swh.indexer import metadata_dictionary mapping_cls = [ cls for cls in metadata_dictionary.MAPPINGS.values() if cls.name == mapping_name ] if not mapping_cls: raise click.ClickException("Unknown mapping {}".format(mapping_name)) assert len(mapping_cls) == 1 mapping_cls = mapping_cls[0] mapping = mapping_cls() codemeta_doc = mapping.translate(file.read()) click.echo(json.dumps(codemeta_doc, indent=4)) @indexer_cli_group.group("schedule") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--indexer-storage-url", "-i", default=None, help="URL of the indexer storage API" ) @click.option( "--storage-url", "-g", default=None, help="URL of the (graph) storage API" ) @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): """Manipulate Software Heritage Indexer tasks. Via SWH Scheduler's API.""" from swh.indexer.storage import get_indexer_storage from swh.scheduler import get_scheduler from swh.storage import get_storage ctx.obj["indexer_storage"] = _get_api( get_indexer_storage, ctx.obj["config"], "indexer_storage", indexer_storage_url ) ctx.obj["storage"] = _get_api( get_storage, ctx.obj["config"], "storage", storage_url ) ctx.obj["scheduler"] = _get_api( get_scheduler, ctx.obj["config"], "scheduler", scheduler_url ) if dry_run: ctx.obj["scheduler"] = None def list_origins_by_producer(idx_storage, mappings, tool_ids) -> Iterator[str]: next_page_token = "" limit = 10000 while next_page_token is not None: result = idx_storage.origin_intrinsic_metadata_search_by_producer( page_token=next_page_token, limit=limit, ids_only=True, mappings=mappings or None, tool_ids=tool_ids or None, ) next_page_token = result.next_page_token yield from result.results @schedule.command("reindex_origin_metadata") @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--tool-id", "-t", "tool_ids", type=int, multiple=True, help="Restrict search of old metadata to this/these tool ids.", ) @click.option( "--mapping", "-m", "mappings", multiple=True, help="Mapping(s) that should be re-scheduled (eg. 'npm', 'gemspec', 'maven')", ) @click.option( "--task-type", default="index-origin-metadata", show_default=True, help="Name of the task type to schedule.", ) @click.pass_context def schedule_origin_metadata_reindex( ctx, origin_batch_size, tool_ids, mappings, task_type ): """Schedules indexing tasks for origins that were already indexed.""" from swh.scheduler.cli_utils import schedule_origin_batches idx_storage = ctx.obj["indexer_storage"] scheduler = ctx.obj["scheduler"] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) kwargs = {"retries_left": 1} schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) @indexer_cli_group.command("journal-client") @click.argument( "indexer", - type=click.Choice(["origin-intrinsic-metadata", "*"]), + type=click.Choice(["origin-intrinsic-metadata", "extrinsic-metadata", "*"]), required=False # TODO: remove required=False after we stop using it ) @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", default="index-origin-metadata", help="Name of the task running the origin metadata indexer.", ) @click.option( "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." ) @click.option( "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." ) @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client( ctx, indexer: Optional[str], scheduler_url: str, origin_metadata_task_type: str, brokers: List[str], prefix: str, group_id: str, stop_after_objects: Optional[int], ): """ Listens for new objects from the SWH Journal, and either: * runs the indexer with the name passed as argument, if any * schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects otherwise. Passing '*' as indexer name runs all indexers. """ import functools import warnings - from swh.indexer.indexer import ObjectsDict + from swh.indexer.indexer import BaseIndexer, ObjectsDict from swh.indexer.journal_client import process_journal_objects from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler cfg = ctx.obj["config"] journal_cfg = cfg.get("journal", {}) scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) brokers = brokers or journal_cfg.get("brokers") if not brokers: raise ValueError("The brokers configuration is mandatory.") prefix = prefix or journal_cfg.get("prefix") group_id = group_id or journal_cfg.get("group_id") origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( "origin_metadata_task_type" ) stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") object_types = set() worker_fns: List[Callable[[ObjectsDict], Dict]] = [] if indexer is None: warnings.warn( "'swh indexer journal-client' with no argument creates scheduler tasks " "to index, rather than index directly.", DeprecationWarning, ) object_types.add("origin_visit_status") worker_fns.append( functools.partial( process_journal_objects, scheduler=scheduler, task_names={ "origin_metadata": origin_metadata_task_type, }, ) ) + idx: Optional[BaseIndexer] = None + if indexer in ("origin-intrinsic-metadata", "*"): from swh.indexer.metadata import OriginMetadataIndexer object_types.add("origin_visit_status") idx = OriginMetadataIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed worker_fns.append(idx.process_journal_objects) + if indexer in ("extrinsic-metadata", "*"): + from swh.indexer.metadata import ExtrinsicMetadataIndexer + + object_types.add("raw_extrinsic_metadata") + idx = ExtrinsicMetadataIndexer() + idx.catch_exceptions = False # don't commit offsets if indexation failed + worker_fns.append(idx.process_journal_objects) + if not worker_fns: raise click.ClickException(f"Unknown indexer: {indexer}") client = get_journal_client( cls="kafka", brokers=brokers, prefix=prefix, group_id=group_id, object_types=list(object_types), stop_after_objects=stop_after_objects, ) def worker_fn(objects: ObjectsDict): for fn in worker_fns: fn(objects) try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @indexer_cli_group.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5007, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" from swh.indexer.storage.api.server import app, load_and_check_config api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) def main(): return indexer_cli_group(auto_envvar_prefix="SWH_INDEXER") if __name__ == "__main__": main() diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index cc2a6b2..93c626a 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,658 +1,733 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from functools import reduce import re from typing import Any, Dict, List from unittest.mock import patch +import attr from click.testing import CliRunner from confluent_kafka import Consumer import pytest from swh.indexer.cli import indexer_cli_group from swh.indexer.storage.interface import IndexerStorageInterface from swh.indexer.storage.model import ( DirectoryIntrinsicMetadataRow, + OriginExtrinsicMetadataRow, OriginIntrinsicMetadataRow, ) from swh.journal.writer import get_journal_writer from swh.model.hashutil import hash_to_bytes -from swh.model.model import OriginVisitStatus +from swh.model.model import Origin, OriginVisitStatus +from .test_metadata import REMD from .utils import DIRECTORY2, REVISION def fill_idx_storage(idx_storage: IndexerStorageInterface, nb_rows: int) -> List[int]: tools: List[Dict[str, Any]] = [ { "tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ OriginIntrinsicMetadataRow( id="file://dev/%04d" % origin_id, from_directory=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] directory_metadata = [ DirectoryIntrinsicMetadataRow( id=hash_to_bytes("abcd{:0>36}".format(origin_id)), indexer_configuration_id=tools[origin_id % 2]["id"], metadata={"name": "origin %d" % origin_id}, mappings=["mapping%d" % (origin_id % 10)], ) for origin_id in range(nb_rows) ] idx_storage.directory_intrinsic_metadata_add(directory_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) @pytest.fixture def cli_runner(): return CliRunner() def test_cli_mapping_list(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list"], catch_exceptions=False, ) expected_output = "\n".join( [ "cff", "codemeta", "composer", "gemspec", "github", "maven", "npm", "pkg-info", "pubspec", "", ] # must be sorted for test to pass ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_cli_mapping_list_terms(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_cli_mapping_list_terms_exclude(cli_runner, swh_config): result = cli_runner.invoke( indexer_cli_group, ["-C", swh_config, "mapping", "list-terms", "--exclude-mapping", "codemeta"], catch_exceptions=False, ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_empty_db( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_divisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_dry_run( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "--dry-run", "reindex_origin_metadata", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_nondivisor( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--batch-size", "20", ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_mapping( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_two_mappings( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "--config-file", swh_config, "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], catch_exceptions=False, ) # Check the output expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_cli_origin_metadata_reindex_filter_one_tool( cli_runner, swh_config, indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]), ], catch_exceptions=False, ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def now(): return datetime.datetime.now(tz=datetime.timezone.utc) def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks(task_type="index-origin-metadata") # This can be split into multiple tasks but no more than the origin-visit-statuses # written in the journal assert len(tasks) <= len(visit_statuses_full) actual_origins = [] for task in tasks: actual_task = dict(task) assert actual_task["type"] == "index-origin-metadata" scheduled_origins = actual_task["arguments"]["args"][0] actual_origins.extend(scheduled_origins) assert set(actual_origins) == {vs.origin for vs in visit_statuses_full} def test_cli_journal_client_without_brokers( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer ): """Without brokers configuration, the cli fails.""" with pytest.raises(ValueError, match="brokers"): cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", ], catch_exceptions=False, ) @pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"]) -def test_cli_journal_client_index( +def test_cli_journal_client_index__origin_intrinsic_metadata( cli_runner, swh_config, kafka_prefix: str, kafka_server, consumer: Consumer, idx_storage, storage, mocker, swh_indexer_config, indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] storage.revision_add([REVISION]) mocker.patch( "swh.indexer.metadata.get_head_swhid", return_value=REVISION.swhid(), ) mocker.patch( "swh.indexer.metadata.DirectoryMetadataIndexer.index", return_value=[ DirectoryIntrinsicMetadataRow( id=DIRECTORY2.id, indexer_configuration_id=1, mappings=["cff"], metadata={"foo": "bar"}, ) ], ) result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", indexer_name, "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), ], catch_exceptions=False, ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output results = idx_storage.origin_intrinsic_metadata_get( [status.origin for status in visit_statuses] ) expected_results = [ OriginIntrinsicMetadataRow( id=status.origin, from_directory=DIRECTORY2.id, tool={"id": 1, **swh_indexer_config["tools"]}, mappings=["cff"], metadata={"foo": "bar"}, ) for status in sorted(visit_statuses_full, key=lambda r: r.origin) ] assert sorted(results, key=lambda r: r.id) == expected_results + + +@pytest.mark.parametrize("indexer_name", ["extrinsic-metadata", "*"]) +def test_cli_journal_client_index__origin_extrinsic_metadata( + cli_runner, + swh_config, + kafka_prefix: str, + kafka_server, + consumer: Consumer, + idx_storage, + storage, + mocker, + swh_indexer_config, + indexer_name: str, +): + """Test the 'swh indexer journal-client' cli tool.""" + journal_writer = get_journal_writer( + "kafka", + brokers=[kafka_server], + prefix=kafka_prefix, + client_id="test producer", + value_sanitizer=lambda object_type, value: value, + flush_timeout=3, # fail early if something is going wrong + ) + + origin = Origin("http://example.org/repo.git") + storage.origin_add([origin]) + raw_extrinsic_metadata = attr.evolve(REMD, target=origin.swhid()) + raw_extrinsic_metadata = attr.evolve( + raw_extrinsic_metadata, id=raw_extrinsic_metadata.compute_hash() + ) + journal_writer.write_additions("raw_extrinsic_metadata", [raw_extrinsic_metadata]) + + result = cli_runner.invoke( + indexer_cli_group, + [ + "-C", + swh_config, + "journal-client", + indexer_name, + "--broker", + kafka_server, + "--prefix", + kafka_prefix, + "--group-id", + "test-consumer", + "--stop-after-objects", + 1, + ], + catch_exceptions=False, + ) + + # Check the output + expected_output = "Done.\n" + assert result.exit_code == 0, result.output + assert result.output == expected_output + + results = idx_storage.origin_extrinsic_metadata_get([origin.url]) + expected_results = [ + OriginExtrinsicMetadataRow( + id=origin.url, + from_remd_id=raw_extrinsic_metadata.id, + tool={"id": 1, **swh_indexer_config["tools"]}, + mappings=["github"], + metadata={ + "@context": "https://doi.org/10.5063/schema/codemeta-2.0", + "type": "https://forgefed.org/ns#Repository", + "name": "test software", + }, + ) + ] + assert sorted(results, key=lambda r: r.id) == expected_results