diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -215,7 +215,7 @@ @indexer_cli_group.command("journal-client") @click.argument( "indexer", - type=click.Choice(["origin-intrinsic-metadata"]), + type=click.Choice(["origin-intrinsic-metadata", "*"]), required=False # TODO: remove required=False after we stop using it ) @@ -250,9 +250,15 @@ group_id: str, stop_after_objects: Optional[int], ): - """Listens for new objects from the SWH Journal, and schedules tasks - to run relevant indexers (currently, only origin-intrinsic-metadata) - on these new objects.""" + """ + Listens for new objects from the SWH Journal, and either: + + * runs the indexer with the name passed as argument, if any + * schedules tasks to run relevant indexers (currently, only + origin-intrinsic-metadata) on these new objects otherwise. + + Passing '*' as indexer name runs all indexers. + """ import functools import warnings @@ -277,7 +283,8 @@ ) stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") - worker_fn: Callable[[ObjectsDict], Dict] + object_types = set() + worker_fns: List[Callable[[ObjectsDict], Dict]] = [] if indexer is None: warnings.warn( @@ -285,22 +292,26 @@ "to index, rather than index directly.", DeprecationWarning, ) - object_types = ["origin_visit_status"] - worker_fn = functools.partial( - process_journal_objects, - scheduler=scheduler, - task_names={ - "origin_metadata": origin_metadata_task_type, - }, + object_types.add("origin_visit_status") + worker_fns.append( + functools.partial( + process_journal_objects, + scheduler=scheduler, + task_names={ + "origin_metadata": origin_metadata_task_type, + }, + ) ) - elif indexer == "origin-intrinsic-metadata": + + if indexer in ("origin-intrinsic-metadata", "*"): from swh.indexer.metadata import OriginMetadataIndexer - object_types = ["origin_visit_status"] + object_types.add("origin_visit_status") idx = OriginMetadataIndexer() idx.catch_exceptions = False # don't commit offsets if indexation failed - worker_fn = idx.process_journal_objects - else: + worker_fns.append(idx.process_journal_objects) + + if not worker_fns: raise click.ClickException(f"Unknown indexer: {indexer}") client = get_journal_client( @@ -308,10 +319,14 @@ brokers=brokers, prefix=prefix, group_id=group_id, - object_types=object_types, + object_types=list(object_types), stop_after_objects=stop_after_objects, ) + def worker_fn(objects: ObjectsDict): + for fn in worker_fns: + fn(objects) + try: client.process(worker_fn) except KeyboardInterrupt: diff --git a/swh/indexer/indexer.py b/swh/indexer/indexer.py --- a/swh/indexer/indexer.py +++ b/swh/indexer/indexer.py @@ -541,10 +541,6 @@ def process_journal_objects(self, objects: ObjectsDict) -> Dict: """Worker function for ``JournalClient``. Expects ``objects`` to have a single key, either ``origin`` or ``"origin_visit_status"``.""" - # TODO: add support for subscribing to other topics? Currently, this is - # not implemented because no indexer would use it. - assert set(objects) <= {"origin", "origin_visit_status"} - origins = [ Origin(url=status["origin"]) for status in objects.get("origin_visit_status", []) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -527,6 +527,7 @@ ) +@pytest.mark.parametrize("indexer_name", ["origin-intrinsic-metadata", "*"]) def test_cli_journal_client_index( cli_runner, swh_config, @@ -537,6 +538,7 @@ storage, mocker, swh_indexer_config, + indexer_name: str, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( @@ -619,7 +621,7 @@ "-C", swh_config, "journal-client", - "origin-intrinsic-metadata", + indexer_name, "--broker", kafka_server, "--prefix",